Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DA78C105B1 for ; Thu, 27 Aug 2015 11:25:19 +0000 (UTC) Received: (qmail 52349 invoked by uid 500); 27 Aug 2015 11:25:19 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 52239 invoked by uid 500); 27 Aug 2015 11:25:19 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 51556 invoked by uid 99); 27 Aug 2015 11:25:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Aug 2015 11:25:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 37998E7E62; Thu, 27 Aug 2015 11:25:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 27 Aug 2015 11:25:33 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport' http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java new file mode 100644 index 0000000..f7c8981 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java @@ -0,0 +1,474 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.kafka_backport.common.protocol; + +import org.apache.flink.kafka_backport.common.protocol.types.ArrayOf; +import org.apache.flink.kafka_backport.common.protocol.types.Field; +import org.apache.flink.kafka_backport.common.protocol.types.Schema; +import org.apache.flink.kafka_backport.common.protocol.types.Type; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +public class Protocol { + + public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", Type.INT16, "The id of the request type."), + new Field("api_version", Type.INT16, "The version of the API."), + new Field("correlation_id", + Type.INT32, + "A user-supplied integer value that will be passed back with the response"), + new Field("client_id", + Type.STRING, + "A user specified identifier for the client making the request.")); + + public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id", + Type.INT32, + "The user-supplied value passed in with the request")); + + /* Metadata api */ + + public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics", + new ArrayOf(Type.STRING), + "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics.")); + + public static final Schema BROKER = new Schema(new Field("node_id", Type.INT32, "The broker id."), + new Field("host", Type.STRING, "The hostname of the broker."), + new Field("port", + Type.INT32, + "The port on which the broker accepts requests.")); + + public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code", + Type.INT16, + "The error code for the partition, if any."), + new Field("partition_id", + Type.INT32, + "The id of the partition."), + new Field("leader", + Type.INT32, + "The id of the broker acting as leader for this partition."), + new Field("replicas", + new ArrayOf(Type.INT32), + "The set of all nodes that host this partition."), + new Field("isr", + new ArrayOf(Type.INT32), + "The set of nodes that are in sync with the leader for this partition.")); + + public static final Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code", + Type.INT16, + "The error code for the given topic."), + new Field("topic", Type.STRING, "The name of the topic"), + new Field("partition_metadata", + new ArrayOf(PARTITION_METADATA_V0), + "Metadata for each partition of the topic.")); + + public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers", + new ArrayOf(BROKER), + "Host and port information for all brokers."), + new Field("topic_metadata", + new ArrayOf(TOPIC_METADATA_V0))); + + public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0}; + public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0}; + + /* Produce api */ + + public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", Type.STRING), + new Field("data", new ArrayOf(new Schema(new Field("partition", Type.INT32), + new Field("record_set", Type.BYTES))))); + + public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks", + Type.INT16, + "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."), + new Field("timeout", Type.INT32, "The time to await a response in ms."), + new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0))); + + public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(new Schema(new Field("topic", Type.STRING), + new Field("partition_responses", + new ArrayOf(new Schema(new Field("partition", + Type.INT32), + new Field("error_code", + Type.INT16), + new Field("base_offset", + Type.INT64)))))))); + + public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0}; + public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0}; + + /* Offset commit api */ + public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition", + Type.INT32, + "Topic partition id."), + new Field("offset", + Type.INT64, + "Message offset to be committed."), + new Field("metadata", + Type.STRING, + "Any associated metadata the client wants to keep.")); + + public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition", + Type.INT32, + "Topic partition id."), + new Field("offset", + Type.INT64, + "Message offset to be committed."), + new Field("timestamp", + Type.INT64, + "Timestamp of the commit"), + new Field("metadata", + Type.STRING, + "Any associated metadata the client wants to keep.")); + + public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(new Field("partition", + Type.INT32, + "Topic partition id."), + new Field("offset", + Type.INT64, + "Message offset to be committed."), + new Field("metadata", + Type.STRING, + "Any associated metadata the client wants to keep.")); + + public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic", + Type.STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0), + "Partitions to commit offsets.")); + + public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic", + Type.STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1), + "Partitions to commit offsets.")); + + public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V2 = new Schema(new Field("topic", + Type.STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2), + "Partitions to commit offsets.")); + + public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id", + Type.STRING, + "The consumer group id."), + new Field("topics", + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), + "Topics to commit offsets.")); + + public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id", + Type.STRING, + "The consumer group id."), + new Field("group_generation_id", + Type.INT32, + "The generation of the consumer group."), + new Field("consumer_id", + Type.STRING, + "The consumer id assigned by the group coordinator."), + new Field("topics", + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), + "Topics to commit offsets.")); + + public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id", + Type.STRING, + "The consumer group id."), + new Field("group_generation_id", + Type.INT32, + "The generation of the consumer group."), + new Field("consumer_id", + Type.STRING, + "The consumer id assigned by the group coordinator."), + new Field("retention_time", + Type.INT64, + "Time period in ms to retain the offset."), + new Field("topics", + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), + "Topics to commit offsets.")); + + public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", + Type.INT32, + "Topic partition id."), + new Field("error_code", + Type.INT16)); + + public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING), + new Field("partition_responses", + new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0))); + + public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0))); + + public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2}; + + /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */ + public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0; + public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0; + + public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2}; + + /* Offset fetch api */ + + /* + * Wire formats of version 0 and 1 are the same, but with different functionality. + * Version 0 will read the offsets from ZK; + * Version 1 will read the offsets from Kafka. + */ + public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", + Type.INT32, + "Topic partition id.")); + + public static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", + Type.STRING, + "Topic to fetch offset."), + new Field("partitions", + new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0), + "Partitions to fetch offsets.")); + + public static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id", + Type.STRING, + "The consumer group id."), + new Field("topics", + new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0), + "Topics to fetch offsets.")); + + public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", + Type.INT32, + "Topic partition id."), + new Field("offset", + Type.INT64, + "Last committed message offset."), + new Field("metadata", + Type.STRING, + "Any associated metadata the client wants to keep."), + new Field("error_code", Type.INT16)); + + public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING), + new Field("partition_responses", + new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0))); + + public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0))); + + public static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0; + public static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0; + + public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1}; + public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1}; + + /* List offset api */ + public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition", + Type.INT32, + "Topic partition id."), + new Field("timestamp", Type.INT64, "Timestamp."), + new Field("max_num_offsets", + Type.INT32, + "Maximum offsets to return.")); + + public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic", + Type.STRING, + "Topic to list offset."), + new Field("partitions", + new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0), + "Partitions to list offset.")); + + public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id", + Type.INT32, + "Broker id of the follower. For normal consumers, use -1."), + new Field("topics", + new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0), + "Topics to list offsets.")); + + public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", + Type.INT32, + "Topic partition id."), + new Field("error_code", Type.INT16), + new Field("offsets", + new ArrayOf(Type.INT64), + "A list of offsets.")); + + public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING), + new Field("partition_responses", + new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0))); + + public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0))); + + public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0}; + public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0}; + + /* Fetch api */ + public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", + Type.INT32, + "Topic partition id."), + new Field("fetch_offset", + Type.INT64, + "Message offset."), + new Field("max_bytes", + Type.INT32, + "Maximum bytes to fetch.")); + + public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", Type.STRING, "Topic to fetch."), + new Field("partitions", + new ArrayOf(FETCH_REQUEST_PARTITION_V0), + "Partitions to fetch.")); + + public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id", + Type.INT32, + "Broker id of the follower. For normal consumers, use -1."), + new Field("max_wait_time", + Type.INT32, + "Maximum time in ms to wait for the response."), + new Field("min_bytes", + Type.INT32, + "Minimum bytes to accumulate in the response."), + new Field("topics", + new ArrayOf(FETCH_REQUEST_TOPIC_V0), + "Topics to fetch.")); + + public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", + Type.INT32, + "Topic partition id."), + new Field("error_code", Type.INT16), + new Field("high_watermark", + Type.INT64, + "Last committed offset."), + new Field("record_set", Type.BYTES)); + + public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING), + new Field("partition_responses", + new ArrayOf(FETCH_RESPONSE_PARTITION_V0))); + + public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); + + public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0}; + public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0}; + + /* Consumer metadata api */ + public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id", + Type.STRING, + "The consumer group id.")); + + public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16), + new Field("coordinator", + BROKER, + "Host and port information for the coordinator for a consumer group.")); + + public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0}; + public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0}; + + /* Join group api */ + public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id", + Type.STRING, + "The consumer group id."), + new Field("session_timeout", + Type.INT32, + "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."), + new Field("topics", + new ArrayOf(Type.STRING), + "An array of topics to subscribe to."), + new Field("consumer_id", + Type.STRING, + "The assigned consumer id or an empty string for a new consumer."), + new Field("partition_assignment_strategy", + Type.STRING, + "The strategy for the coordinator to assign partitions.")); + + public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING), + new Field("partitions", new ArrayOf(Type.INT32))); + public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16), + new Field("group_generation_id", + Type.INT32, + "The generation of the consumer group."), + new Field("consumer_id", + Type.STRING, + "The consumer id assigned by the group coordinator."), + new Field("assigned_partitions", + new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0))); + + public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0}; + public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0}; + + /* Heartbeat api */ + public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", Type.STRING, "The consumer group id."), + new Field("group_generation_id", + Type.INT32, + "The generation of the consumer group."), + new Field("consumer_id", + Type.STRING, + "The consumer id assigned by the group coordinator.")); + + public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16)); + + public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0}; + public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0}; + + /* an array of all requests and responses with all schema versions */ + public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; + public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][]; + + /* the latest version of each api */ + public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1]; + + static { + REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST; + REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST; + REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST; + REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST; + REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; + REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; + REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {}; + REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {}; + REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST; + REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST; + REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST; + REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST; + REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST; + + RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; + RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; + RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE; + RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE; + RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; + RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; + RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {}; + RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {}; + RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE; + RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE; + RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE; + RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE; + RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE; + + /* set the maximum version of each api */ + for (ApiKeys api : ApiKeys.values()) + CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1); + + /* sanity check that we have the same number of request and response versions for each api */ + for (ApiKeys api : ApiKeys.values()) + if (REQUESTS[api.id].length != RESPONSES[api.id].length) + throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name + + " but " + RESPONSES[api.id].length + " response versions."); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java new file mode 100644 index 0000000..ab5a607 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.kafka_backport.common.protocol; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +public enum SecurityProtocol { + /** Un-authenticated, non-encrypted channel */ + PLAINTEXT(0, "PLAINTEXT"), + /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */ + TRACE(Short.MAX_VALUE, "TRACE"); + + private static final Map CODE_TO_SECURITY_PROTOCOL = new HashMap(); + private static final List NAMES = new ArrayList(); + + static { + for (SecurityProtocol proto: SecurityProtocol.values()) { + CODE_TO_SECURITY_PROTOCOL.put(proto.id, proto); + NAMES.add(proto.name); + } + } + + /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */ + public final short id; + + /** Name of the security protocol. This may be used by client configuration. */ + public final String name; + + private SecurityProtocol(int id, String name) { + this.id = (short) id; + this.name = name; + } + + public static String getName(int id) { + return CODE_TO_SECURITY_PROTOCOL.get((short) id).name; + } + + public static List getNames() { + return NAMES; + } + + public static SecurityProtocol forId(Short id) { + return CODE_TO_SECURITY_PROTOCOL.get(id); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java new file mode 100644 index 0000000..d2468d8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.kafka_backport.common.protocol.types; + +import java.nio.ByteBuffer; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * Represents a type for an array of a particular type + */ +public class ArrayOf extends Type { + + private final Type type; + + public ArrayOf(Type type) { + this.type = type; + } + + @Override + public void write(ByteBuffer buffer, Object o) { + Object[] objs = (Object[]) o; + int size = objs.length; + buffer.putInt(size); + for (int i = 0; i < size; i++) + type.write(buffer, objs[i]); + } + + @Override + public Object read(ByteBuffer buffer) { + int size = buffer.getInt(); + Object[] objs = new Object[size]; + for (int i = 0; i < size; i++) + objs[i] = type.read(buffer); + return objs; + } + + @Override + public int sizeOf(Object o) { + Object[] objs = (Object[]) o; + int size = 4; + for (int i = 0; i < objs.length; i++) + size += type.sizeOf(objs[i]); + return size; + } + + public Type type() { + return type; + } + + @Override + public String toString() { + return "ARRAY(" + type + ")"; + } + + @Override + public Object[] validate(Object item) { + try { + Object[] array = (Object[]) item; + for (int i = 0; i < array.length; i++) + type.validate(array[i]); + return array; + } catch (ClassCastException e) { + throw new SchemaException("Not an Object[]."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java new file mode 100644 index 0000000..b7d7720 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.kafka_backport.common.protocol.types; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * A field in a schema + */ +public class Field { + + public static final Object NO_DEFAULT = new Object(); + + final int index; + public final String name; + public final Type type; + public final Object defaultValue; + public final String doc; + final Schema schema; + + /** + * Create the field. + * + * @throws SchemaException If the default value is not primitive and the validation fails + */ + public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) { + this.index = index; + this.name = name; + this.type = type; + this.doc = doc; + this.defaultValue = defaultValue; + this.schema = schema; + if (defaultValue != NO_DEFAULT) + type.validate(defaultValue); + } + + public Field(int index, String name, Type type, String doc, Object defaultValue) { + this(index, name, type, doc, defaultValue, null); + } + + public Field(String name, Type type, String doc, Object defaultValue) { + this(-1, name, type, doc, defaultValue); + } + + public Field(String name, Type type, String doc) { + this(name, type, doc, NO_DEFAULT); + } + + public Field(String name, Type type) { + this(name, type, ""); + } + + public Type type() { + return type; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java new file mode 100644 index 0000000..7adac52 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.kafka_backport.common.protocol.types; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * The schema for a compound record definition + */ +public class Schema extends Type { + + private final Field[] fields; + private final Map fieldsByName; + + /** + * Construct the schema with a given list of its field values + * + * @throws SchemaException If the given list have duplicate fields + */ + public Schema(Field... fs) { + this.fields = new Field[fs.length]; + this.fieldsByName = new HashMap(); + for (int i = 0; i < this.fields.length; i++) { + Field field = fs[i]; + if (fieldsByName.containsKey(field.name)) + throw new SchemaException("Schema contains a duplicate field: " + field.name); + this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this); + this.fieldsByName.put(fs[i].name, this.fields[i]); + } + } + + /** + * Write a struct to the buffer + */ + public void write(ByteBuffer buffer, Object o) { + Struct r = (Struct) o; + for (int i = 0; i < fields.length; i++) { + Field f = fields[i]; + try { + Object value = f.type().validate(r.get(f)); + f.type.write(buffer, value); + } catch (Exception e) { + throw new SchemaException("Error writing field '" + f.name + + "': " + + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); + } + } + } + + /** + * Read a struct from the buffer + */ + public Object read(ByteBuffer buffer) { + Object[] objects = new Object[fields.length]; + for (int i = 0; i < fields.length; i++) { + try { + objects[i] = fields[i].type.read(buffer); + } catch (Exception e) { + throw new SchemaException("Error reading field '" + fields[i].name + + "': " + + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); + } + } + return new Struct(this, objects); + } + + /** + * The size of the given record + */ + public int sizeOf(Object o) { + int size = 0; + Struct r = (Struct) o; + for (int i = 0; i < fields.length; i++) + size += fields[i].type.sizeOf(r.get(fields[i])); + return size; + } + + /** + * The number of fields in this schema + */ + public int numFields() { + return this.fields.length; + } + + /** + * Get a field by its slot in the record array + * + * @param slot The slot at which this field sits + * @return The field + */ + public Field get(int slot) { + return this.fields[slot]; + } + + /** + * Get a field by its name + * + * @param name The name of the field + * @return The field + */ + public Field get(String name) { + return this.fieldsByName.get(name); + } + + /** + * Get all the fields in this schema + */ + public Field[] fields() { + return this.fields; + } + + /** + * Display a string representation of the schema + */ + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + for (int i = 0; i < this.fields.length; i++) { + b.append(this.fields[i].name); + b.append(':'); + b.append(this.fields[i].type()); + if (i < this.fields.length - 1) + b.append(','); + } + b.append("}"); + return b.toString(); + } + + @Override + public Struct validate(Object item) { + try { + Struct struct = (Struct) item; + for (int i = 0; i < this.fields.length; i++) { + Field field = this.fields[i]; + try { + field.type.validate(struct.get(field)); + } catch (SchemaException e) { + throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage()); + } + } + return struct; + } catch (ClassCastException e) { + throw new SchemaException("Not a Struct."); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java new file mode 100644 index 0000000..86c141e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.kafka_backport.common.protocol.types; + +import org.apache.flink.kafka_backport.common.KafkaException; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * Thrown if the protocol schema validation fails while parsing request or response. + */ +public class SchemaException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public SchemaException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java new file mode 100644 index 0000000..482fe9d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.kafka_backport.common.protocol.types; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * A record that can be serialized and deserialized according to a pre-defined schema + */ +public class Struct { + private final Schema schema; + private final Object[] values; + + Struct(Schema schema, Object[] values) { + this.schema = schema; + this.values = values; + } + + public Struct(Schema schema) { + this.schema = schema; + this.values = new Object[this.schema.numFields()]; + } + + /** + * The schema for this struct. + */ + public Schema schema() { + return this.schema; + } + + /** + * Return the value of the given pre-validated field, or if the value is missing return the default value. + * + * @param field The field for which to get the default value + * @throws SchemaException if the field has no value and has no default. + */ + private Object getFieldOrDefault(Field field) { + Object value = this.values[field.index]; + if (value != null) + return value; + else if (field.defaultValue != Field.NO_DEFAULT) + return field.defaultValue; + else + throw new SchemaException("Missing value for field '" + field.name + "' which has no default value."); + } + + /** + * Get the value for the field directly by the field index with no lookup needed (faster!) + * + * @param field The field to look up + * @return The value for that field. + * @throws SchemaException if the field has no value and has no default. + */ + public Object get(Field field) { + validateField(field); + return getFieldOrDefault(field); + } + + /** + * Get the record value for the field with the given name by doing a hash table lookup (slower!) + * + * @param name The name of the field + * @return The value in the record + * @throws SchemaException If no such field exists + */ + public Object get(String name) { + Field field = schema.get(name); + if (field == null) + throw new SchemaException("No such field: " + name); + return getFieldOrDefault(field); + } + + /** + * Check if the struct contains a field. + * @param name + * @return Whether a field exists. + */ + public boolean hasField(String name) { + return schema.get(name) != null; + } + + public Struct getStruct(Field field) { + return (Struct) get(field); + } + + public Struct getStruct(String name) { + return (Struct) get(name); + } + + public Short getShort(Field field) { + return (Short) get(field); + } + + public Short getShort(String name) { + return (Short) get(name); + } + + public Integer getInt(Field field) { + return (Integer) get(field); + } + + public Integer getInt(String name) { + return (Integer) get(name); + } + + public Long getLong(Field field) { + return (Long) get(field); + } + + public Long getLong(String name) { + return (Long) get(name); + } + + public Object[] getArray(Field field) { + return (Object[]) get(field); + } + + public Object[] getArray(String name) { + return (Object[]) get(name); + } + + public String getString(Field field) { + return (String) get(field); + } + + public String getString(String name) { + return (String) get(name); + } + + public ByteBuffer getBytes(Field field) { + return (ByteBuffer) get(field); + } + + public ByteBuffer getBytes(String name) { + return (ByteBuffer) get(name); + } + + /** + * Set the given field to the specified value + * + * @param field The field + * @param value The value + * @throws SchemaException If the validation of the field failed + */ + public Struct set(Field field, Object value) { + validateField(field); + this.values[field.index] = value; + return this; + } + + /** + * Set the field specified by the given name to the value + * + * @param name The name of the field + * @param value The value to set + * @throws SchemaException If the field is not known + */ + public Struct set(String name, Object value) { + Field field = this.schema.get(name); + if (field == null) + throw new SchemaException("Unknown field: " + name); + this.values[field.index] = value; + return this; + } + + /** + * Create a struct for the schema of a container type (struct or array). Note that for array type, this method + * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be + * instantiated with this method. + * + * @param field The field to create an instance of + * @return The struct + * @throws SchemaException If the given field is not a container type + */ + public Struct instance(Field field) { + validateField(field); + if (field.type() instanceof Schema) { + return new Struct((Schema) field.type()); + } else if (field.type() instanceof ArrayOf) { + ArrayOf array = (ArrayOf) field.type(); + return new Struct((Schema) array.type()); + } else { + throw new SchemaException("Field '" + field.name + "' is not a container type, it is of type " + field.type()); + } + } + + /** + * Create a struct instance for the given field which must be a container type (struct or array) + * + * @param field The name of the field to create (field must be a schema type) + * @return The struct + * @throws SchemaException If the given field is not a container type + */ + public Struct instance(String field) { + return instance(schema.get(field)); + } + + /** + * Empty all the values from this record + */ + public void clear() { + Arrays.fill(this.values, null); + } + + /** + * Get the serialized size of this object + */ + public int sizeOf() { + return this.schema.sizeOf(this); + } + + /** + * Write this struct to a buffer + */ + public void writeTo(ByteBuffer buffer) { + this.schema.write(buffer, this); + } + + /** + * Ensure the user doesn't try to access fields from the wrong schema + * + * @throws SchemaException If validation fails + */ + private void validateField(Field field) { + if (this.schema != field.schema) + throw new SchemaException("Attempt to access field '" + field.name + "' from a different schema instance."); + if (field.index > values.length) + throw new SchemaException("Invalid field index: " + field.index); + } + + /** + * Validate the contents of this struct against its schema + * + * @throws SchemaException If validation fails + */ + public void validate() { + this.schema.validate(this); + } + + /** + * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break + * the struct into multiple ByteBuffers if need be. + */ + public ByteBuffer[] toBytes() { + ByteBuffer buffer = ByteBuffer.allocate(sizeOf()); + writeTo(buffer); + return new ByteBuffer[] {buffer}; + } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + for (int i = 0; i < this.values.length; i++) { + Field f = this.schema.get(i); + b.append(f.name); + b.append('='); + if (f.type() instanceof ArrayOf) { + Object[] arrayValue = (Object[]) this.values[i]; + b.append('['); + for (int j = 0; j < arrayValue.length; j++) { + b.append(arrayValue[j]); + if (j < arrayValue.length - 1) + b.append(','); + } + b.append(']'); + } else + b.append(this.values[i]); + if (i < this.values.length - 1) + b.append(','); + } + b.append('}'); + return b.toString(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + for (int i = 0; i < this.values.length; i++) { + Field f = this.schema.get(i); + if (f.type() instanceof ArrayOf) { + Object[] arrayObject = (Object[]) this.get(f); + for (Object arrayItem: arrayObject) + result = prime * result + arrayItem.hashCode(); + } else { + result = prime * result + this.get(f).hashCode(); + } + } + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Struct other = (Struct) obj; + if (schema != other.schema) + return false; + for (int i = 0; i < this.values.length; i++) { + Field f = this.schema.get(i); + Boolean result; + if (f.type() instanceof ArrayOf) { + result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f)); + } else { + result = this.get(f).equals(other.get(f)); + } + if (!result) + return false; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java new file mode 100644 index 0000000..26bdd2f --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.kafka_backport.common.protocol.types; + +import org.apache.flink.kafka_backport.common.utils.Utils; + +import java.nio.ByteBuffer; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * A serializable type + */ +public abstract class Type { + + /** + * Write the typed object to the buffer + * + * @throws SchemaException If the object is not valid for its type + */ + public abstract void write(ByteBuffer buffer, Object o); + + /** + * Read the typed object from the buffer + * + * @throws SchemaException If the object is not valid for its type + */ + public abstract Object read(ByteBuffer buffer); + + /** + * Validate the object. If succeeded return its typed object. + * + * @throws SchemaException If validation failed + */ + public abstract Object validate(Object o); + + /** + * Return the size of the object in bytes + */ + public abstract int sizeOf(Object o); + + public static final Type INT8 = new Type() { + @Override + public void write(ByteBuffer buffer, Object o) { + buffer.put((Byte) o); + } + + @Override + public Object read(ByteBuffer buffer) { + return buffer.get(); + } + + @Override + public int sizeOf(Object o) { + return 1; + } + + @Override + public String toString() { + return "INT8"; + } + + @Override + public Byte validate(Object item) { + if (item instanceof Byte) + return (Byte) item; + else + throw new SchemaException(item + " is not a Byte."); + } + }; + + public static final Type INT16 = new Type() { + @Override + public void write(ByteBuffer buffer, Object o) { + buffer.putShort((Short) o); + } + + @Override + public Object read(ByteBuffer buffer) { + return buffer.getShort(); + } + + @Override + public int sizeOf(Object o) { + return 2; + } + + @Override + public String toString() { + return "INT16"; + } + + @Override + public Short validate(Object item) { + if (item instanceof Short) + return (Short) item; + else + throw new SchemaException(item + " is not a Short."); + } + }; + + public static final Type INT32 = new Type() { + @Override + public void write(ByteBuffer buffer, Object o) { + buffer.putInt((Integer) o); + } + + @Override + public Object read(ByteBuffer buffer) { + return buffer.getInt(); + } + + @Override + public int sizeOf(Object o) { + return 4; + } + + @Override + public String toString() { + return "INT32"; + } + + @Override + public Integer validate(Object item) { + if (item instanceof Integer) + return (Integer) item; + else + throw new SchemaException(item + " is not an Integer."); + } + }; + + public static final Type INT64 = new Type() { + @Override + public void write(ByteBuffer buffer, Object o) { + buffer.putLong((Long) o); + } + + @Override + public Object read(ByteBuffer buffer) { + return buffer.getLong(); + } + + @Override + public int sizeOf(Object o) { + return 8; + } + + @Override + public String toString() { + return "INT64"; + } + + @Override + public Long validate(Object item) { + if (item instanceof Long) + return (Long) item; + else + throw new SchemaException(item + " is not a Long."); + } + }; + + public static final Type STRING = new Type() { + @Override + public void write(ByteBuffer buffer, Object o) { + byte[] bytes = Utils.utf8((String) o); + if (bytes.length > Short.MAX_VALUE) + throw new SchemaException("String is longer than the maximum string length."); + buffer.putShort((short) bytes.length); + buffer.put(bytes); + } + + @Override + public Object read(ByteBuffer buffer) { + int length = buffer.getShort(); + byte[] bytes = new byte[length]; + buffer.get(bytes); + return Utils.utf8(bytes); + } + + @Override + public int sizeOf(Object o) { + return 2 + Utils.utf8Length((String) o); + } + + @Override + public String toString() { + return "STRING"; + } + + @Override + public String validate(Object item) { + if (item instanceof String) + return (String) item; + else + throw new SchemaException(item + " is not a String."); + } + }; + + public static final Type BYTES = new Type() { + @Override + public void write(ByteBuffer buffer, Object o) { + ByteBuffer arg = (ByteBuffer) o; + int pos = arg.position(); + buffer.putInt(arg.remaining()); + buffer.put(arg); + arg.position(pos); + } + + @Override + public Object read(ByteBuffer buffer) { + int size = buffer.getInt(); + ByteBuffer val = buffer.slice(); + val.limit(size); + buffer.position(buffer.position() + size); + return val; + } + + @Override + public int sizeOf(Object o) { + ByteBuffer buffer = (ByteBuffer) o; + return 4 + buffer.remaining(); + } + + @Override + public String toString() { + return "BYTES"; + } + + @Override + public ByteBuffer validate(Object item) { + if (item instanceof ByteBuffer) + return (ByteBuffer) item; + else + throw new SchemaException(item + " is not a java.nio.ByteBuffer."); + } + }; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java new file mode 100644 index 0000000..99a20a3 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.kafka_backport.common.record; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * A byte buffer backed input outputStream + */ +public class ByteBufferInputStream extends InputStream { + + private ByteBuffer buffer; + + public ByteBufferInputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public int read() { + if (!buffer.hasRemaining()) { + return -1; + } + return buffer.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) { + if (!buffer.hasRemaining()) { + return -1; + } + + len = Math.min(len, buffer.remaining()); + buffer.get(bytes, off, len); + return len; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java new file mode 100644 index 0000000..a334755 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.kafka_backport.common.record; + +import java.io.OutputStream; +import java.nio.ByteBuffer; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * A byte buffer backed output outputStream + */ +public class ByteBufferOutputStream extends OutputStream { + + private static final float REALLOCATION_FACTOR = 1.1f; + + private ByteBuffer buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public void write(int b) { + if (buffer.remaining() < 1) + expandBuffer(buffer.capacity() + 1); + buffer.put((byte) b); + } + + public void write(byte[] bytes, int off, int len) { + if (buffer.remaining() < len) + expandBuffer(buffer.capacity() + len); + buffer.put(bytes, off, len); + } + + public ByteBuffer buffer() { + return buffer; + } + + private void expandBuffer(int size) { + int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size); + ByteBuffer temp = ByteBuffer.allocate(expandSize); + temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); + buffer = temp; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java new file mode 100644 index 0000000..9961766 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.kafka_backport.common.record; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * The compression type to use + */ +public enum CompressionType { + NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f); + + public final int id; + public final String name; + public final float rate; + + private CompressionType(int id, String name, float rate) { + this.id = id; + this.name = name; + this.rate = rate; + } + + public static CompressionType forId(int id) { + switch (id) { + case 0: + return NONE; + case 1: + return GZIP; + case 2: + return SNAPPY; + case 3: + return LZ4; + default: + throw new IllegalArgumentException("Unknown compression type id: " + id); + } + } + + public static CompressionType forName(String name) { + if (NONE.name.equals(name)) + return NONE; + else if (GZIP.name.equals(name)) + return GZIP; + else if (SNAPPY.name.equals(name)) + return SNAPPY; + else if (LZ4.name.equals(name)) + return LZ4; + else + throw new IllegalArgumentException("Unknown compression name: " + name); + } + +}