Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9E591200BCE for ; Fri, 2 Dec 2016 14:34:55 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9D326160B4A; Fri, 2 Dec 2016 13:34:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E3E42160B2E for ; Fri, 2 Dec 2016 14:34:52 +0100 (CET) Received: (qmail 69581 invoked by uid 500); 2 Dec 2016 13:34:52 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 69178 invoked by uid 99); 2 Dec 2016 13:34:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2016 13:34:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C49DF17D7; Fri, 2 Dec 2016 13:34:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Fri, 02 Dec 2016 13:34:57 -0000 Message-Id: <589a8819320e43d8a47abe996a981313@git.apache.org> In-Reply-To: <627ed550fa9e4aed8e3752516869c22e@git.apache.org> References: <627ed550fa9e4aed8e3752516869c22e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module. archived-at: Fri, 02 Dec 2016 13:34:55 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java deleted file mode 100644 index cf39606..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.util.SerializedValue; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Base class for all fetchers, which implement the connections to Kafka brokers and - * pull records from Kafka partitions. - * - *

This fetcher base class implements the logic around emitting records and tracking offsets, - * as well as around the optional timestamp assignment and watermark generation. - * - * @param The type of elements deserialized from Kafka's byte records, and emitted into - * the Flink data streams. - * @param The type of topic/partition identifier used by Kafka in the specific version. - */ -public abstract class AbstractFetcher { - - protected static final int NO_TIMESTAMPS_WATERMARKS = 0; - protected static final int PERIODIC_WATERMARKS = 1; - protected static final int PUNCTUATED_WATERMARKS = 2; - - // ------------------------------------------------------------------------ - - /** The source context to emit records and watermarks to */ - protected final SourceContext sourceContext; - - /** The lock that guarantees that record emission and state updates are atomic, - * from the view of taking a checkpoint */ - protected final Object checkpointLock; - - /** All partitions (and their state) that this fetcher is subscribed to */ - private final KafkaTopicPartitionState[] allPartitions; - - /** The mode describing whether the fetcher also generates timestamps and watermarks */ - protected final int timestampWatermarkMode; - - /** Flag whether to register metrics for the fetcher */ - protected final boolean useMetrics; - - /** Only relevant for punctuated watermarks: The current cross partition watermark */ - private volatile long maxWatermarkSoFar = Long.MIN_VALUE; - - // ------------------------------------------------------------------------ - - protected AbstractFetcher( - SourceContext sourceContext, - List assignedPartitions, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, - ProcessingTimeService processingTimeProvider, - long autoWatermarkInterval, - ClassLoader userCodeClassLoader, - boolean useMetrics) throws Exception - { - this.sourceContext = checkNotNull(sourceContext); - this.checkpointLock = sourceContext.getCheckpointLock(); - this.useMetrics = useMetrics; - - // figure out what we watermark mode we will be using - - if (watermarksPeriodic == null) { - if (watermarksPunctuated == null) { - // simple case, no watermarks involved - timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS; - } else { - timestampWatermarkMode = PUNCTUATED_WATERMARKS; - } - } else { - if (watermarksPunctuated == null) { - timestampWatermarkMode = PERIODIC_WATERMARKS; - } else { - throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks"); - } - } - - // create our partition state according to the timestamp/watermark mode - this.allPartitions = initializePartitions( - assignedPartitions, - timestampWatermarkMode, - watermarksPeriodic, watermarksPunctuated, - userCodeClassLoader); - - // if we have periodic watermarks, kick off the interval scheduler - if (timestampWatermarkMode == PERIODIC_WATERMARKS) { - KafkaTopicPartitionStateWithPeriodicWatermarks[] parts = - (KafkaTopicPartitionStateWithPeriodicWatermarks[]) allPartitions; - - PeriodicWatermarkEmitter periodicEmitter = - new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval); - periodicEmitter.start(); - } - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - /** - * Gets all partitions (with partition state) that this fetcher is subscribed to. - * - * @return All subscribed partitions. - */ - protected final KafkaTopicPartitionState[] subscribedPartitions() { - return allPartitions; - } - - // ------------------------------------------------------------------------ - // Core fetcher work methods - // ------------------------------------------------------------------------ - - public abstract void runFetchLoop() throws Exception; - - public abstract void cancel(); - - // ------------------------------------------------------------------------ - // Kafka version specifics - // ------------------------------------------------------------------------ - - /** - * Creates the Kafka version specific representation of the given - * topic partition. - * - * @param partition The Flink representation of the Kafka topic partition. - * @return The specific Kafka representation of the Kafka topic partition. - */ - public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition); - - /** - * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for - * older Kafka versions). The given offsets are the internal checkpointed offsets, representing - * the last processed record of each partition. Version-specific implementations of this method - * need to hold the contract that the given offsets must be incremented by 1 before - * committing them, so that committed offsets to Kafka represent "the next record to process". - * - * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing). - * @throws Exception This method forwards exceptions. - */ - public abstract void commitInternalOffsetsToKafka(Map offsets) throws Exception; - - // ------------------------------------------------------------------------ - // snapshot and restore the state - // ------------------------------------------------------------------------ - - /** - * Takes a snapshot of the partition offsets. - * - *

Important: This method mus be called under the checkpoint lock. - * - * @return A map from partition to current offset. - */ - public HashMap snapshotCurrentState() { - // this method assumes that the checkpoint lock is held - assert Thread.holdsLock(checkpointLock); - - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); - } - return state; - } - - /** - * Restores the partition offsets. - * - * @param snapshotState The offsets for the partitions - */ - public void restoreOffsets(HashMap snapshotState) { - for (KafkaTopicPartitionState partition : allPartitions) { - Long offset = snapshotState.get(partition.getKafkaTopicPartition()); - if (offset != null) { - partition.setOffset(offset); - } - } - } - - // ------------------------------------------------------------------------ - // emitting records - // ------------------------------------------------------------------------ - - /** - * Emits a record without attaching an existing timestamp to it. - * - *

Implementation Note: This method is kept brief to be JIT inlining friendly. - * That makes the fast path efficient, the extended paths are called as separate methods. - * - * @param record The record to emit - * @param partitionState The state of the Kafka partition from which the record was fetched - * @param offset The offset of the record - */ - protected void emitRecord(T record, KafkaTopicPartitionState partitionState, long offset) throws Exception { - if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { - // fast path logic, in case there are no watermarks - - // emit the record, using the checkpoint lock to guarantee - // atomicity of record emission and offset state update - synchronized (checkpointLock) { - sourceContext.collect(record); - partitionState.setOffset(offset); - } - } - else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { - emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE); - } - else { - emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE); - } - } - - /** - * Emits a record attaching a timestamp to it. - * - *

Implementation Note: This method is kept brief to be JIT inlining friendly. - * That makes the fast path efficient, the extended paths are called as separate methods. - * - * @param record The record to emit - * @param partitionState The state of the Kafka partition from which the record was fetched - * @param offset The offset of the record - */ - protected void emitRecordWithTimestamp( - T record, KafkaTopicPartitionState partitionState, long offset, long timestamp) throws Exception { - - if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { - // fast path logic, in case there are no watermarks generated in the fetcher - - // emit the record, using the checkpoint lock to guarantee - // atomicity of record emission and offset state update - synchronized (checkpointLock) { - sourceContext.collectWithTimestamp(record, timestamp); - partitionState.setOffset(offset); - } - } - else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { - emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp); - } - else { - emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp); - } - } - - /** - * Record emission, if a timestamp will be attached from an assigner that is - * also a periodic watermark generator. - */ - protected void emitRecordWithTimestampAndPeriodicWatermark( - T record, KafkaTopicPartitionState partitionState, long offset, long kafkaEventTimestamp) - { - @SuppressWarnings("unchecked") - final KafkaTopicPartitionStateWithPeriodicWatermarks withWatermarksState = - (KafkaTopicPartitionStateWithPeriodicWatermarks) partitionState; - - // extract timestamp - this accesses/modifies the per-partition state inside the - // watermark generator instance, so we need to lock the access on the - // partition state. concurrent access can happen from the periodic emitter - final long timestamp; - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (withWatermarksState) { - timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); - } - - // emit the record with timestamp, using the usual checkpoint lock to guarantee - // atomicity of record emission and offset state update - synchronized (checkpointLock) { - sourceContext.collectWithTimestamp(record, timestamp); - partitionState.setOffset(offset); - } - } - - /** - * Record emission, if a timestamp will be attached from an assigner that is - * also a punctuated watermark generator. - */ - protected void emitRecordWithTimestampAndPunctuatedWatermark( - T record, KafkaTopicPartitionState partitionState, long offset, long kafkaEventTimestamp) - { - @SuppressWarnings("unchecked") - final KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = - (KafkaTopicPartitionStateWithPunctuatedWatermarks) partitionState; - - // only one thread ever works on accessing timestamps and watermarks - // from the punctuated extractor - final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); - final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp); - - // emit the record with timestamp, using the usual checkpoint lock to guarantee - // atomicity of record emission and offset state update - synchronized (checkpointLock) { - sourceContext.collectWithTimestamp(record, timestamp); - partitionState.setOffset(offset); - } - - // if we also have a new per-partition watermark, check if that is also a - // new cross-partition watermark - if (newWatermark != null) { - updateMinPunctuatedWatermark(newWatermark); - } - } - - /** - *Checks whether a new per-partition watermark is also a new cross-partition watermark. - */ - private void updateMinPunctuatedWatermark(Watermark nextWatermark) { - if (nextWatermark.getTimestamp() > maxWatermarkSoFar) { - long newMin = Long.MAX_VALUE; - - for (KafkaTopicPartitionState state : allPartitions) { - @SuppressWarnings("unchecked") - final KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = - (KafkaTopicPartitionStateWithPunctuatedWatermarks) state; - - newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark()); - } - - // double-check locking pattern - if (newMin > maxWatermarkSoFar) { - synchronized (checkpointLock) { - if (newMin > maxWatermarkSoFar) { - maxWatermarkSoFar = newMin; - sourceContext.emitWatermark(new Watermark(newMin)); - } - } - } - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - /** - * Utility method that takes the topic partitions and creates the topic partition state - * holders. If a watermark generator per partition exists, this will also initialize those. - */ - private KafkaTopicPartitionState[] initializePartitions( - List assignedPartitions, - int timestampWatermarkMode, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, - ClassLoader userCodeClassLoader) - throws IOException, ClassNotFoundException - { - switch (timestampWatermarkMode) { - - case NO_TIMESTAMPS_WATERMARKS: { - @SuppressWarnings("unchecked") - KafkaTopicPartitionState[] partitions = - (KafkaTopicPartitionState[]) new KafkaTopicPartitionState[assignedPartitions.size()]; - - int pos = 0; - for (KafkaTopicPartition partition : assignedPartitions) { - // create the kafka version specific partition handle - KPH kafkaHandle = createKafkaPartitionHandle(partition); - partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle); - } - - return partitions; - } - - case PERIODIC_WATERMARKS: { - @SuppressWarnings("unchecked") - KafkaTopicPartitionStateWithPeriodicWatermarks[] partitions = - (KafkaTopicPartitionStateWithPeriodicWatermarks[]) - new KafkaTopicPartitionStateWithPeriodicWatermarks[assignedPartitions.size()]; - - int pos = 0; - for (KafkaTopicPartition partition : assignedPartitions) { - KPH kafkaHandle = createKafkaPartitionHandle(partition); - - AssignerWithPeriodicWatermarks assignerInstance = - watermarksPeriodic.deserializeValue(userCodeClassLoader); - - partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( - partition, kafkaHandle, assignerInstance); - } - - return partitions; - } - - case PUNCTUATED_WATERMARKS: { - @SuppressWarnings("unchecked") - KafkaTopicPartitionStateWithPunctuatedWatermarks[] partitions = - (KafkaTopicPartitionStateWithPunctuatedWatermarks[]) - new KafkaTopicPartitionStateWithPunctuatedWatermarks[assignedPartitions.size()]; - - int pos = 0; - for (KafkaTopicPartition partition : assignedPartitions) { - KPH kafkaHandle = createKafkaPartitionHandle(partition); - - AssignerWithPunctuatedWatermarks assignerInstance = - watermarksPunctuated.deserializeValue(userCodeClassLoader); - - partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>( - partition, kafkaHandle, assignerInstance); - } - - return partitions; - } - default: - // cannot happen, add this as a guard for the future - throw new RuntimeException(); - } - } - - // ------------------------- Metrics ---------------------------------- - - /** - * Add current and committed offsets to metric group - * - * @param metricGroup The metric group to use - */ - protected void addOffsetStateGauge(MetricGroup metricGroup) { - // add current offsets to gage - MetricGroup currentOffsets = metricGroup.addGroup("current-offsets"); - MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets"); - for (KafkaTopicPartitionState ktp: subscribedPartitions()) { - currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET)); - committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET)); - } - } - - /** - * Gauge types - */ - private enum OffsetGaugeType { - CURRENT_OFFSET, - COMMITTED_OFFSET - } - - /** - * Gauge for getting the offset of a KafkaTopicPartitionState. - */ - private static class OffsetGauge implements Gauge { - - private final KafkaTopicPartitionState ktp; - private final OffsetGaugeType gaugeType; - - public OffsetGauge(KafkaTopicPartitionState ktp, OffsetGaugeType gaugeType) { - this.ktp = ktp; - this.gaugeType = gaugeType; - } - - @Override - public Long getValue() { - switch(gaugeType) { - case COMMITTED_OFFSET: - return ktp.getCommittedOffset(); - case CURRENT_OFFSET: - return ktp.getOffset(); - default: - throw new RuntimeException("Unknown gauge type: " + gaugeType); - } - } - } - // ------------------------------------------------------------------------ - - /** - * The periodic watermark emitter. In its given interval, it checks all partitions for - * the current event time watermark, and possibly emits the next watermark. - */ - private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback { - - private final KafkaTopicPartitionStateWithPeriodicWatermarks[] allPartitions; - - private final SourceContext emitter; - - private final ProcessingTimeService timerService; - - private final long interval; - - private long lastWatermarkTimestamp; - - //------------------------------------------------- - - PeriodicWatermarkEmitter( - KafkaTopicPartitionStateWithPeriodicWatermarks[] allPartitions, - SourceContext emitter, - ProcessingTimeService timerService, - long autoWatermarkInterval) - { - this.allPartitions = checkNotNull(allPartitions); - this.emitter = checkNotNull(emitter); - this.timerService = checkNotNull(timerService); - this.interval = autoWatermarkInterval; - this.lastWatermarkTimestamp = Long.MIN_VALUE; - } - - //------------------------------------------------- - - public void start() { - timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); - } - - @Override - public void onProcessingTime(long timestamp) throws Exception { - - long minAcrossAll = Long.MAX_VALUE; - for (KafkaTopicPartitionStateWithPeriodicWatermarks state : allPartitions) { - - // we access the current watermark for the periodic assigners under the state - // lock, to prevent concurrent modification to any internal variables - final long curr; - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (state) { - curr = state.getCurrentWatermarkTimestamp(); - } - - minAcrossAll = Math.min(minAcrossAll, curr); - } - - // emit next watermark, if there is one - if (minAcrossAll > lastWatermarkTimestamp) { - lastWatermarkTimestamp = minAcrossAll; - emitter.emitWatermark(new Watermark(minAcrossAll)); - } - - // schedule the next watermark - timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java deleted file mode 100644 index c736493..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import javax.annotation.Nullable; -import java.util.concurrent.atomic.AtomicReference; - -/** - * A proxy that communicates exceptions between threads. Typically used if an exception - * from a spawned thread needs to be recognized by the "parent" (spawner) thread. - * - *

The spawned thread would set the exception via {@link #reportError(Throwable)}. - * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}. - * Optionally, the parent can pass itself in the constructor to be interrupted as soon as - * an exception occurs. - * - *

- * {@code
- * 
- * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread());
- * 
- * Thread subThread = new Thread() {
- * 
- *     public void run() {
- *         try {
- *             doSomething();
- *         } catch (Throwable t) {
- *             errorProxy.reportError(
- *         } finally {
- *             doSomeCleanup();
- *         }
- *     }
- * };
- * subThread.start();
- * 
- * doSomethingElse();
- * errorProxy.checkAndThrowException();
- * 
- * doSomethingMore();
- * errorProxy.checkAndThrowException();
- * 
- * try {
- *     subThread.join();
- * } catch (InterruptedException e) {
- *     errorProxy.checkAndThrowException();
- *     // restore interrupted status, if not caused by an exception
- *     Thread.currentThread().interrupt();
- * }
- * }
- * 
- */ -public class ExceptionProxy { - - /** The thread that should be interrupted when an exception occurs */ - private final Thread toInterrupt; - - /** The exception to throw */ - private final AtomicReference exception; - - /** - * Creates an exception proxy that interrupts the given thread upon - * report of an exception. The thread to interrupt may be null. - * - * @param toInterrupt The thread to interrupt upon an exception. May be null. - */ - public ExceptionProxy(@Nullable Thread toInterrupt) { - this.toInterrupt = toInterrupt; - this.exception = new AtomicReference<>(); - } - - // ------------------------------------------------------------------------ - - /** - * Sets the exception and interrupts the target thread, - * if no other exception has occurred so far. - * - *

The exception is only set (and the interruption is only triggered), - * if no other exception was set before. - * - * @param t The exception that occurred - */ - public void reportError(Throwable t) { - // set the exception, if it is the first (and the exception is non null) - if (t != null && exception.compareAndSet(null, t) && toInterrupt != null) { - toInterrupt.interrupt(); - } - } - - /** - * Checks whether an exception has been set via {@link #reportError(Throwable)}. - * If yes, that exception if re-thrown by this method. - * - * @throws Exception This method re-throws the exception, if set. - */ - public void checkAndThrowException() throws Exception { - Throwable t = exception.get(); - if (t != null) { - if (t instanceof Exception) { - throw (Exception) t; - } - else if (t instanceof Error) { - throw (Error) t; - } - else { - throw new Exception(t); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java deleted file mode 100644 index c68fe28..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import static java.util.Objects.requireNonNull; - -/** - * Flink's description of a partition in a Kafka topic. - * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...) - * - *

Note: This class must not change in its structure, because it would change the - * serialization format and make previous savepoints unreadable. - */ -public final class KafkaTopicPartition implements Serializable { - - /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK - * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */ - private static final long serialVersionUID = 722083576322742325L; - - // ------------------------------------------------------------------------ - - private final String topic; - private final int partition; - private final int cachedHash; - - public KafkaTopicPartition(String topic, int partition) { - this.topic = requireNonNull(topic); - this.partition = partition; - this.cachedHash = 31 * topic.hashCode() + partition; - } - - // ------------------------------------------------------------------------ - - public String getTopic() { - return topic; - } - - public int getPartition() { - return partition; - } - - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "KafkaTopicPartition{" + - "topic='" + topic + '\'' + - ", partition=" + partition + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - else if (o instanceof KafkaTopicPartition) { - KafkaTopicPartition that = (KafkaTopicPartition) o; - return this.partition == that.partition && this.topic.equals(that.topic); - } - else { - return false; - } - } - - @Override - public int hashCode() { - return cachedHash; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - public static String toString(Map map) { - StringBuilder sb = new StringBuilder(); - for (Map.Entry p: map.entrySet()) { - KafkaTopicPartition ktp = p.getKey(); - sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", "); - } - return sb.toString(); - } - - public static String toString(List partitions) { - StringBuilder sb = new StringBuilder(); - for (KafkaTopicPartition p: partitions) { - sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", "); - } - return sb.toString(); - } - - - public static List dropLeaderData(List partitionInfos) { - List ret = new ArrayList<>(partitionInfos.size()); - for(KafkaTopicPartitionLeader ktpl: partitionInfos) { - ret.add(ktpl.getTopicPartition()); - } - return ret; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java deleted file mode 100644 index 1959a05..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.kafka.common.Node; - -import java.io.Serializable; - -/** - * Serializable Topic Partition info with leader Node information. - * This class is used at runtime. - */ -public class KafkaTopicPartitionLeader implements Serializable { - - private static final long serialVersionUID = 9145855900303748582L; - - private final int leaderId; - private final int leaderPort; - private final String leaderHost; - private final KafkaTopicPartition topicPartition; - private final int cachedHash; - - public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) { - this.topicPartition = topicPartition; - if (leader == null) { - this.leaderId = -1; - this.leaderHost = null; - this.leaderPort = -1; - } else { - this.leaderId = leader.id(); - this.leaderPort = leader.port(); - this.leaderHost = leader.host(); - } - int cachedHash = (leader == null) ? 14 : leader.hashCode(); - this.cachedHash = 31 * cachedHash + topicPartition.hashCode(); - } - - public KafkaTopicPartition getTopicPartition() { - return topicPartition; - } - - public Node getLeader() { - if (this.leaderId == -1) { - return null; - } else { - return new Node(leaderId, leaderHost, leaderPort); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof KafkaTopicPartitionLeader)) { - return false; - } - - KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o; - - if (!topicPartition.equals(that.topicPartition)) { - return false; - } - return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost); - } - - @Override - public int hashCode() { - return cachedHash; - } - - @Override - public String toString() { - return "KafkaTopicPartitionLeader{" + - "leaderId=" + leaderId + - ", leaderPort=" + leaderPort + - ", leaderHost='" + leaderHost + '\'' + - ", topic=" + topicPartition.getTopic() + - ", partition=" + topicPartition.getPartition() + - '}'; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java deleted file mode 100644 index 7cb5f46..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -/** - * The state that the Flink Kafka Consumer holds for each Kafka partition. - * Includes the Kafka descriptor for partitions. - * - *

This class describes the most basic state (only the offset), subclasses - * define more elaborate state, containing current watermarks and timestamp - * extractors. - * - * @param The type of the Kafka partition descriptor, which varies across Kafka versions. - */ -public class KafkaTopicPartitionState { - - /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid), - * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */ - public static final long OFFSET_NOT_SET = -915623761776L; - - // ------------------------------------------------------------------------ - - /** The Flink description of a Kafka partition */ - private final KafkaTopicPartition partition; - - /** The Kafka description of a Kafka partition (varies across different Kafka versions) */ - private final KPH kafkaPartitionHandle; - - /** The offset within the Kafka partition that we already processed */ - private volatile long offset; - - /** The offset of the Kafka partition that has been committed */ - private volatile long committedOffset; - - // ------------------------------------------------------------------------ - - public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) { - this.partition = partition; - this.kafkaPartitionHandle = kafkaPartitionHandle; - this.offset = OFFSET_NOT_SET; - this.committedOffset = OFFSET_NOT_SET; - } - - // ------------------------------------------------------------------------ - - /** - * Gets Flink's descriptor for the Kafka Partition. - * @return The Flink partition descriptor. - */ - public final KafkaTopicPartition getKafkaTopicPartition() { - return partition; - } - - /** - * Gets Kafka's descriptor for the Kafka Partition. - * @return The Kafka partition descriptor. - */ - public final KPH getKafkaPartitionHandle() { - return kafkaPartitionHandle; - } - - public final String getTopic() { - return partition.getTopic(); - } - - public final int getPartition() { - return partition.getPartition(); - } - - /** - * The current offset in the partition. This refers to the offset last element that - * we retrieved and emitted successfully. It is the offset that should be stored in - * a checkpoint. - */ - public final long getOffset() { - return offset; - } - - public final void setOffset(long offset) { - this.offset = offset; - } - - public final boolean isOffsetDefined() { - return offset != OFFSET_NOT_SET; - } - - public final void setCommittedOffset(long offset) { - this.committedOffset = offset; - } - - public final long getCommittedOffset() { - return committedOffset; - } - - - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "Partition: " + partition + ", KafkaPartitionHandle=" + kafkaPartitionHandle - + ", offset=" + (isOffsetDefined() ? String.valueOf(offset) : "(not set)"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java deleted file mode 100644 index efdc73f..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.watermark.Watermark; - -/** - * A special version of the per-kafka-partition-state that additionally holds - * a periodic watermark generator (and timestamp extractor) per partition. - * - * @param The type of records handled by the watermark generator - * @param The type of the Kafka partition descriptor, which varies across Kafka versions. - */ -public final class KafkaTopicPartitionStateWithPeriodicWatermarks extends KafkaTopicPartitionState { - - /** The timestamp assigner and watermark generator for the partition */ - private final AssignerWithPeriodicWatermarks timestampsAndWatermarks; - - /** The last watermark timestamp generated by this partition */ - private long partitionWatermark; - - // ------------------------------------------------------------------------ - - public KafkaTopicPartitionStateWithPeriodicWatermarks( - KafkaTopicPartition partition, KPH kafkaPartitionHandle, - AssignerWithPeriodicWatermarks timestampsAndWatermarks) - { - super(partition, kafkaPartitionHandle); - - this.timestampsAndWatermarks = timestampsAndWatermarks; - this.partitionWatermark = Long.MIN_VALUE; - } - - // ------------------------------------------------------------------------ - - public long getTimestampForRecord(T record, long kafkaEventTimestamp) { - return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp); - } - - public long getCurrentWatermarkTimestamp() { - Watermark wm = timestampsAndWatermarks.getCurrentWatermark(); - if (wm != null) { - partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp()); - } - return partitionWatermark; - } - - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition() - + ", offset=" + getOffset() + ", watermark=" + partitionWatermark; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java deleted file mode 100644 index edf40ce..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.watermark.Watermark; - -import javax.annotation.Nullable; - -/** - * A special version of the per-kafka-partition-state that additionally holds - * a periodic watermark generator (and timestamp extractor) per partition. - * - *

This class is not thread safe, but it gives volatile access to the current - * partition watermark ({@link #getCurrentPartitionWatermark()}). - * - * @param The type of records handled by the watermark generator - * @param The type of the Kafka partition descriptor, which varies across Kafka versions - */ -public final class KafkaTopicPartitionStateWithPunctuatedWatermarks extends KafkaTopicPartitionState { - - /** The timestamp assigner and watermark generator for the partition */ - private final AssignerWithPunctuatedWatermarks timestampsAndWatermarks; - - /** The last watermark timestamp generated by this partition */ - private volatile long partitionWatermark; - - // ------------------------------------------------------------------------ - - public KafkaTopicPartitionStateWithPunctuatedWatermarks( - KafkaTopicPartition partition, KPH kafkaPartitionHandle, - AssignerWithPunctuatedWatermarks timestampsAndWatermarks) - { - super(partition, kafkaPartitionHandle); - - this.timestampsAndWatermarks = timestampsAndWatermarks; - this.partitionWatermark = Long.MIN_VALUE; - } - - // ------------------------------------------------------------------------ - - public long getTimestampForRecord(T record, long kafkaEventTimestamp) { - return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp); - } - - @Nullable - public Watermark checkAndGetNewWatermark(T record, long timestamp) { - Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp); - if (mark != null && mark.getTimestamp() > partitionWatermark) { - partitionWatermark = mark.getTimestamp(); - return mark; - } - else { - return null; - } - } - - public long getCurrentPartitionWatermark() { - return partitionWatermark; - } - - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition() - + ", offset=" + getOffset() + ", watermark=" + partitionWatermark; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java deleted file mode 100644 index 7a41ade..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -public class TypeUtil { - private TypeUtil() {} - - /** - * Creates TypeInformation array for an array of Classes. - * @param fieldTypes classes to extract type information from - * @return type information - */ - public static TypeInformation[] toTypeInfo(Class[] fieldTypes) { - TypeInformation[] typeInfos = new TypeInformation[fieldTypes.length]; - for (int i = 0; i < fieldTypes.length; i++) { - typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]); - } - return typeInfos; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java deleted file mode 100644 index cedb696..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals.metrics; - -import org.apache.flink.metrics.Gauge; - -/** - * Gauge for getting the current value of a Kafka metric. - */ -public class KafkaMetricWrapper implements Gauge { - private final org.apache.kafka.common.Metric kafkaMetric; - - public KafkaMetricWrapper(org.apache.kafka.common.Metric metric) { - this.kafkaMetric = metric; - } - - @Override - public Double getValue() { - return kafkaMetric.value(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java deleted file mode 100644 index 9b848e0..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka.partitioner; - -import java.io.Serializable; - -/** - * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition. - * - * Note, one Kafka partition can contain multiple Flink partitions. - * - * Cases: - * # More Flink partitions than kafka partitions - *

- * 		Flink Sinks:		Kafka Partitions
- * 			1	---------------->	1
- * 			2   --------------/
- * 			3   -------------/
- * 			4	------------/
- * 
- * Some (or all) kafka partitions contain the output of more than one flink partition - * - *# Fewer Flink partitions than Kafka - *
- * 		Flink Sinks:		Kafka Partitions
- * 			1	---------------->	1
- * 			2	---------------->	2
- * 										3
- * 										4
- * 										5
- * 
- * - * Not all Kafka partitions contain data - * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will - * cause a lot of network connections between all the Flink instances and all the Kafka brokers - * - * - */ -public class FixedPartitioner extends KafkaPartitioner implements Serializable { - private static final long serialVersionUID = 1627268846962918126L; - - private int targetPartition = -1; - - @Override - public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { - if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) { - throw new IllegalArgumentException(); - } - - this.targetPartition = partitions[parallelInstanceId % partitions.length]; - } - - @Override - public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - if (targetPartition >= 0) { - return targetPartition; - } else { - throw new RuntimeException("The partitioner has not been initialized properly"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java deleted file mode 100644 index 37e2ef6..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.partitioner; - -import java.io.Serializable; - -/** - * It contains a open() method which is called on each parallel instance. - * Partitioners must be serializable! - */ -public abstract class KafkaPartitioner implements Serializable { - - private static final long serialVersionUID = -1974260817778593473L; - - /** - * Initializer for the Partitioner. - * @param parallelInstanceId 0-indexed id of the parallel instance in Flink - * @param parallelInstances the total number of parallel instances - * @param partitions an array describing the partition IDs of the available Kafka partitions. - */ - public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { - // overwrite this method if needed. - } - - public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java deleted file mode 100644 index d170058..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util.serialization; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import java.io.IOException; - - -/** - * DeserializationSchema that deserializes a JSON String into an ObjectNode. - *

- * Fields can be accessed by calling objectNode.get(<name>).as(<type>) - */ -public class JSONDeserializationSchema extends AbstractDeserializationSchema { - private ObjectMapper mapper; - - @Override - public ObjectNode deserialize(byte[] message) throws IOException { - if (mapper == null) { - mapper = new ObjectMapper(); - } - return mapper.readValue(message, ObjectNode.class); - } - - @Override - public boolean isEndOfStream(ObjectNode nextElement) { - return false; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java deleted file mode 100644 index 261a111..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util.serialization; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import java.io.IOException; - -import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; - -/** - * DeserializationSchema that deserializes a JSON String into an ObjectNode. - *

- * Key fields can be accessed by calling objectNode.get("key").get(<name>).as(<type>) - *

- * Value fields can be accessed by calling objectNode.get("value").get(<name>).as(<type>) - *

- * Metadata fields can be accessed by calling objectNode.get("metadata").get(<name>).as(<type>) and include - * the "offset" (long), "topic" (String) and "partition" (int). - */ -public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema { - private final boolean includeMetadata; - private ObjectMapper mapper; - - public JSONKeyValueDeserializationSchema(boolean includeMetadata) { - this.includeMetadata = includeMetadata; - } - - @Override - public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - if (mapper == null) { - mapper = new ObjectMapper(); - } - ObjectNode node = mapper.createObjectNode(); - node.set("key", mapper.readValue(messageKey, JsonNode.class)); - node.set("value", mapper.readValue(message, JsonNode.class)); - if (includeMetadata) { - node.putObject("metadata") - .put("offset", offset) - .put("topic", topic) - .put("partition", partition); - } - return node; - } - - @Override - public boolean isEndOfStream(ObjectNode nextElement) { - return false; - } - - @Override - public TypeInformation getProducedType() { - return getForClass(ObjectNode.class); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java deleted file mode 100644 index 4344810..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util.serialization; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.typeutils.RowTypeInfo; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; - -/** - * Deserialization schema from JSON to {@link Row}. - * - *

Deserializes the byte[] messages as a JSON object and reads - * the specified fields. - * - *

Failure during deserialization are forwarded as wrapped IOExceptions. - */ -public class JsonRowDeserializationSchema implements DeserializationSchema { - - /** Field names to parse. Indices match fieldTypes indices. */ - private final String[] fieldNames; - - /** Types to parse fields as. Indices match fieldNames indices. */ - private final TypeInformation[] fieldTypes; - - /** Object mapper for parsing the JSON. */ - private final ObjectMapper objectMapper = new ObjectMapper(); - - /** Flag indicating whether to fail on a missing field. */ - private boolean failOnMissingField; - - /** - * Creates a JSON deserialization schema for the given fields and type classes. - * - * @param fieldNames Names of JSON fields to parse. - * @param fieldTypes Type classes to parse JSON fields as. - */ - public JsonRowDeserializationSchema(String[] fieldNames, Class[] fieldTypes) { - this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); - - this.fieldTypes = new TypeInformation[fieldTypes.length]; - for (int i = 0; i < fieldTypes.length; i++) { - this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]); - } - - Preconditions.checkArgument(fieldNames.length == fieldTypes.length, - "Number of provided field names and types does not match."); - } - - /** - * Creates a JSON deserialization schema for the given fields and types. - * - * @param fieldNames Names of JSON fields to parse. - * @param fieldTypes Types to parse JSON fields as. - */ - public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation[] fieldTypes) { - this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); - this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types"); - - Preconditions.checkArgument(fieldNames.length == fieldTypes.length, - "Number of provided field names and types does not match."); - } - - @Override - public Row deserialize(byte[] message) throws IOException { - try { - JsonNode root = objectMapper.readTree(message); - - Row row = new Row(fieldNames.length); - for (int i = 0; i < fieldNames.length; i++) { - JsonNode node = root.get(fieldNames[i]); - - if (node == null) { - if (failOnMissingField) { - throw new IllegalStateException("Failed to find field with name '" - + fieldNames[i] + "'."); - } else { - row.setField(i, null); - } - } else { - // Read the value as specified type - Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass()); - row.setField(i, value); - } - } - - return row; - } catch (Throwable t) { - throw new IOException("Failed to deserialize JSON object.", t); - } - } - - @Override - public boolean isEndOfStream(Row nextElement) { - return false; - } - - @Override - public TypeInformation getProducedType() { - return new RowTypeInfo(fieldTypes); - } - - /** - * Configures the failure behaviour if a JSON field is missing. - * - *

By default, a missing field is ignored and the field is set to null. - * - * @param failOnMissingField Flag indicating whether to fail or not on a missing field. - */ - public void setFailOnMissingField(boolean failOnMissingField) { - this.failOnMissingField = failOnMissingField; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java deleted file mode 100644 index 077ff13..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util.serialization; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.api.table.Row; -import org.apache.flink.util.Preconditions; - - -/** - * Serialization schema that serializes an object into a JSON bytes. - * - *

Serializes the input {@link Row} object into a JSON string and - * converts it into byte[]. - * - *

Result byte[] messages can be deserialized using - * {@link JsonRowDeserializationSchema}. - */ -public class JsonRowSerializationSchema implements SerializationSchema { - /** Fields names in the input Row object */ - private final String[] fieldNames; - /** Object mapper that is used to create output JSON objects */ - private static ObjectMapper mapper = new ObjectMapper(); - - /** - * Creates a JSON serialization schema for the given fields and types. - * - * @param fieldNames Names of JSON fields to parse. - */ - public JsonRowSerializationSchema(String[] fieldNames) { - this.fieldNames = Preconditions.checkNotNull(fieldNames); - } - - @Override - public byte[] serialize(Row row) { - if (row.productArity() != fieldNames.length) { - throw new IllegalStateException(String.format( - "Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length)); - } - - ObjectNode objectNode = mapper.createObjectNode(); - - for (int i = 0; i < row.productArity(); i++) { - JsonNode node = mapper.valueToTree(row.productElement(i)); - objectNode.set(fieldNames[i], node); - } - - try { - return mapper.writeValueAsBytes(objectNode); - } catch (Exception e) { - throw new RuntimeException("Failed to serialize row", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java deleted file mode 100644 index 01e72ca..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util.serialization; - -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; - -import java.io.IOException; -import java.io.Serializable; - -/** - * The deserialization schema describes how to turn the byte key / value messages delivered by certain - * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are - * processed by Flink. - * - * @param The type created by the keyed deserialization schema. - */ -public interface KeyedDeserializationSchema extends Serializable, ResultTypeQueryable { - - /** - * Deserializes the byte message. - * - * @param messageKey the key as a byte array (null if no key has been set) - * @param message The message, as a byte array. (null if the message was empty or deleted) - * @param partition The partition the message has originated from - * @param offset the offset of the message in the original source (for example the Kafka offset) @return The deserialized message as an object. - */ - T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; - - /** - * Method to decide whether the element signals the end of the stream. If - * true is returned the element won't be emitted. - * - * @param nextElement The element to test for the end-of-stream signal. - * @return True, if the element signals end of stream, false otherwise. - */ - boolean isEndOfStream(T nextElement); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java deleted file mode 100644 index 4b9dba2..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util.serialization; - -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import java.io.IOException; - -/** - * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema - * interface - * @param The type created by the deserialization schema. - */ -public class KeyedDeserializationSchemaWrapper implements KeyedDeserializationSchema { - - private static final long serialVersionUID = 2651665280744549932L; - - private final DeserializationSchema deserializationSchema; - - public KeyedDeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { - this.deserializationSchema = deserializationSchema; - } - @Override - public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - return deserializationSchema.deserialize(message); - } - - @Override - public boolean isEndOfStream(T nextElement) { - return deserializationSchema.isEndOfStream(nextElement); - } - - @Override - public TypeInformation getProducedType() { - return deserializationSchema.getProducedType(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java deleted file mode 100644 index 701281e..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util.serialization; - -import java.io.Serializable; - -/** - * The serialization schema describes how to turn a data object into a different serialized - * representation. Most data sinks (for example Apache Kafka) require the data to be handed - * to them in a specific format (for example as byte strings). - * - * @param The type to be serialized. - */ -public interface KeyedSerializationSchema extends Serializable { - - /** - * Serializes the key of the incoming element to a byte array - * This method might return null if no key is available. - * - * @param element The incoming element to be serialized - * @return the key of the element as a byte array - */ - byte[] serializeKey(T element); - - - /** - * Serializes the value of the incoming element to a byte array - * - * @param element The incoming element to be serialized - * @return the value of the element as a byte array - */ - byte[] serializeValue(T element); - - /** - * Optional method to determine the target topic for the element - * - * @param element Incoming element to determine the target topic from - * @return null or the target topic - */ - String getTargetTopic(T element); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java deleted file mode 100644 index 1b3e486..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util.serialization; - -/** - * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema - * interface - * @param The type to serialize - */ -public class KeyedSerializationSchemaWrapper implements KeyedSerializationSchema { - - private static final long serialVersionUID = 1351665280744549933L; - - private final SerializationSchema serializationSchema; - - public KeyedSerializationSchemaWrapper(SerializationSchema serializationSchema) { - this.serializationSchema = serializationSchema; - } - - @Override - public byte[] serializeKey(T element) { - return null; - } - - @Override - public byte[] serializeValue(T element) { - return serializationSchema.serialize(element); - } - - @Override - public String getTargetTopic(T element) { - return null; // we are never overriding the topic - } -}