flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [03/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.
Date Fri, 02 Dec 2016 13:34:53 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/pom.xml b/flink-streaming-connectors/flink-connector-kinesis/pom.xml
deleted file mode 100644
index 29170ad..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/pom.xml
+++ /dev/null
@@ -1,164 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-connectors</artifactId>
-		<version>1.2-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-kinesis_2.10</artifactId>
-	<name>flink-connector-kinesis</name>
-	<properties>
-		<aws.sdk.version>1.10.71</aws.sdk.version>
-		<aws.kinesis-kcl.version>1.6.2</aws.kinesis-kcl.version>
-		<aws.kinesis-kpl.version>0.10.2</aws.kinesis-kpl.version>
-	</properties>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<!-- Note:
-			The below dependencies are licenced under the Amazon Software License.
-			Flink includes the "flink-connector-kinesis" only as an optional dependency for that reason.
-		-->
-		<dependency>
-			<groupId>com.amazonaws</groupId>
-			<artifactId>aws-java-sdk-kinesis</artifactId>
-			<version>${aws.sdk.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>com.amazonaws</groupId>
-			<artifactId>amazon-kinesis-producer</artifactId>
-			<version>${aws.kinesis-kpl.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>com.amazonaws</groupId>
-			<artifactId>amazon-kinesis-client</artifactId>
-			<version>${aws.kinesis-kcl.version}</version>
-			<!--
-				We're excluding the below from the KCL since we'll only be using the
-				com.amazonaws.services.kinesis.clientlibrary.types.UserRecord class, which will not need these dependencies.
-			-->
-			<exclusions>
-				<exclusion>
-					<groupId>com.amazonaws</groupId>
-					<artifactId>aws-java-sdk-dynamodb</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.amazonaws</groupId>
-					<artifactId>aws-java-sdk-cloudwatch</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
-							<artifactSet combine.children="append">
-								<includes>
-									<include>com.amazonaws:*</include>
-									<include>com.google.protobuf:*</include>
-								</includes>
-							</artifactSet>
-							<relocations combine.children="override">
-								<!-- DO NOT RELOCATE GUAVA IN THIS PACKAGE -->
-								<relocation>
-									<pattern>org.objectweb.asm</pattern>
-									<shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>com.google.protobuf</pattern>
-									<shadedPattern>org.apache.flink.kinesis.shaded.com.google.protobuf</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>com.amazonaws</pattern>
-									<shadedPattern>org.apache.flink.kinesis.shaded.com.amazonaws</shadedPattern>
-								</relocation>
-							</relocations>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
deleted file mode 100644
index a62dc10..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
-import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The Flink Kinesis Consumer is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis
- * streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is
- * responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will
- * change as shards are closed and created by Kinesis.
- *
- * <p>To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis
- * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for
- * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial
- * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.</p>
- *
- * @param <T> the type of data emitted
- */
-public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
-	implements CheckpointedAsynchronously<HashMap<KinesisStreamShard, SequenceNumber>>, ResultTypeQueryable<T> {
-
-	private static final long serialVersionUID = 4724006128720664870L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class);
-
-	// ------------------------------------------------------------------------
-	//  Consumer properties
-	// ------------------------------------------------------------------------
-
-	/** The names of the Kinesis streams that we will be consuming from */
-	private final List<String> streams;
-
-	/** Properties to parametrize settings such as AWS service region, initial position in stream,
-	 * shard list retrieval behaviours, etc */
-	private final Properties configProps;
-
-	/** User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects */
-	private final KinesisDeserializationSchema<T> deserializer;
-
-	// ------------------------------------------------------------------------
-	//  Runtime state
-	// ------------------------------------------------------------------------
-
-	/** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */
-	private transient KinesisDataFetcher<T> fetcher;
-
-	/** The sequence numbers in the last state snapshot of this subtask */
-	private transient HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot;
-
-	/** The sequence numbers to restore to upon restore from failure */
-	private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore;
-
-	private volatile boolean running = true;
-
-
-	// ------------------------------------------------------------------------
-	//  Constructors
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new Flink Kinesis Consumer.
-	 *
-	 * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
-	 * from are configured with a {@link Properties} instance.</p>
-	 *
-	 * @param stream
-	 *           The single AWS Kinesis stream to read from.
-	 * @param deserializer
-	 *           The deserializer used to convert raw bytes of Kinesis records to Java objects (without key).
-	 * @param configProps
-	 *           The properties used to configure AWS credentials, AWS region, and initial starting position.
-	 */
-	public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps) {
-		this(stream, new KinesisDeserializationSchemaWrapper<>(deserializer), configProps);
-	}
-
-	/**
-	 * Creates a new Flink Kinesis Consumer.
-	 *
-	 * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
-	 * from are configured with a {@link Properties} instance.</p>
-	 *
-	 * @param stream
-	 *           The single AWS Kinesis stream to read from.
-	 * @param deserializer
-	 *           The keyed deserializer used to convert raw bytes of Kinesis records to Java objects.
-	 * @param configProps
-	 *           The properties used to configure AWS credentials, AWS region, and initial starting position.
-	 */
-	public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps) {
-		this(Collections.singletonList(stream), deserializer, configProps);
-	}
-
-	/**
-	 * Creates a new Flink Kinesis Consumer.
-	 *
-	 * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
-	 * from are configured with a {@link Properties} instance.</p>
-	 *
-	 * @param streams
-	 *           The AWS Kinesis streams to read from.
-	 * @param deserializer
-	 *           The keyed deserializer used to convert raw bytes of Kinesis records to Java objects.
-	 * @param configProps
-	 *           The properties used to configure AWS credentials, AWS region, and initial starting position.
-	 */
-	public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps) {
-		checkNotNull(streams, "streams can not be null");
-		checkArgument(streams.size() != 0, "must be consuming at least 1 stream");
-		checkArgument(!streams.contains(""), "stream names cannot be empty Strings");
-		this.streams = streams;
-
-		this.configProps = checkNotNull(configProps, "configProps can not be null");
-
-		// check the configuration properties for any conflicting settings
-		KinesisConfigUtil.validateConsumerConfiguration(this.configProps);
-
-		this.deserializer = checkNotNull(deserializer, "deserializer can not be null");
-
-		if (LOG.isInfoEnabled()) {
-			StringBuilder sb = new StringBuilder();
-			for (String stream : streams) {
-				sb.append(stream).append(", ");
-			}
-			LOG.info("Flink Kinesis Consumer is going to read the following streams: {}", sb.toString());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Source life cycle
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-
-		// restore to the last known sequence numbers from the latest complete snapshot
-		if (sequenceNumsToRestore != null) {
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Subtask {} is restoring sequence numbers {} from previous checkpointed state",
-					getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore.toString());
-			}
-
-			// initialize sequence numbers with restored state
-			lastStateSnapshot = sequenceNumsToRestore;
-		} else {
-			// start fresh with empty sequence numbers if there are no snapshots to restore from.
-			lastStateSnapshot = new HashMap<>();
-		}
-	}
-
-	@Override
-	public void run(SourceContext<T> sourceContext) throws Exception {
-
-		// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
-		// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
-		// can potentially have new shards to subscribe to later on
-		fetcher = new KinesisDataFetcher<>(
-			streams, sourceContext, getRuntimeContext(), configProps, deserializer);
-
-		boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
-		fetcher.setIsRestoringFromFailure(isRestoringFromFailure);
-
-		// if we are restoring from a checkpoint, we iterate over the restored
-		// state and accordingly seed the fetcher with subscribed shards states
-		if (isRestoringFromFailure) {
-			for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) {
-				fetcher.advanceLastDiscoveredShardOfStream(
-					restored.getKey().getStreamName(), restored.getKey().getShard().getShardId());
-
-				if (LOG.isInfoEnabled()) {
-					LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
-							" starting state set to the restored sequence number {}",
-						getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), restored.getValue());
-				}
-				fetcher.registerNewSubscribedShardState(
-					new KinesisStreamShardState(restored.getKey(), restored.getValue()));
-			}
-		}
-
-		// check that we are running before starting the fetcher
-		if (!running) {
-			return;
-		}
-
-		// start the fetcher loop. The fetcher will stop running only when cancel() or
-		// close() is called, or an error is thrown by threads created by the fetcher
-		fetcher.runFetcher();
-
-		// check that the fetcher has terminated before fully closing
-		fetcher.awaitTermination();
-		sourceContext.close();
-	}
-
-	@Override
-	public void cancel() {
-		running = false;
-
-		KinesisDataFetcher fetcher = this.fetcher;
-		this.fetcher = null;
-
-		// this method might be called before the subtask actually starts running,
-		// so we must check if the fetcher is actually created
-		if (fetcher != null) {
-			try {
-				// interrupt the fetcher of any work
-				fetcher.shutdownFetcher();
-				fetcher.awaitTermination();
-			} catch (Exception e) {
-				LOG.warn("Error while closing Kinesis data fetcher", e);
-			}
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		cancel();
-		super.close();
-	}
-
-	@Override
-	public TypeInformation<T> getProducedType() {
-		return deserializer.getProducedType();
-	}
-
-	// ------------------------------------------------------------------------
-	//  State Snapshot & Restore
-	// ------------------------------------------------------------------------
-
-	@Override
-	public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		if (lastStateSnapshot == null) {
-			LOG.debug("snapshotState() requested on not yet opened source; returning null.");
-			return null;
-		}
-
-		if (fetcher == null) {
-			LOG.debug("snapshotState() requested on not yet running source; returning null.");
-			return null;
-		}
-
-		if (!running) {
-			LOG.debug("snapshotState() called on closed source; returning null.");
-			return null;
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Snapshotting state ...");
-		}
-
-		lastStateSnapshot = fetcher.snapshotState();
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
-				lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
-		}
-
-		return lastStateSnapshot;
-	}
-
-	@Override
-	public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {
-		sequenceNumsToRestore = restoredState;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
deleted file mode 100644
index 579bd6b..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis;
-
-import com.amazonaws.services.kinesis.producer.Attempt;
-import com.amazonaws.services.kinesis.producer.KinesisProducer;
-import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
-import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
-import com.amazonaws.services.kinesis.producer.UserRecordResult;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.PropertiesUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Objects;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis.
- *
- * @param <OUT> Data type to produce into Kinesis Streams
- */
-public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
-
-	/** Properties to parametrize settings such as AWS service region, access key etc. */
-	private final Properties configProps;
-
-	/* Flag controlling the error behavior of the producer */
-	private boolean failOnError = false;
-
-	/* Name of the default stream to produce to. Can be overwritten by the serialization schema */
-	private String defaultStream;
-
-	/* Default partition id. Can be overwritten by the serialization schema */
-	private String defaultPartition;
-
-	/* Schema for turning the OUT type into a byte array. */
-	private final KinesisSerializationSchema<OUT> schema;
-
-	/* Optional custom partitioner */
-	private KinesisPartitioner<OUT> customPartitioner = null;
-
-
-	// --------------------------- Runtime fields ---------------------------
-
-
-	/* Our Kinesis instance for each parallel Flink sink */
-	private transient KinesisProducer producer;
-
-	/* Callback handling failures */
-	private transient FutureCallback<UserRecordResult> callback;
-
-	/* Field for async exception */
-	private transient volatile Throwable thrownException;
-
-
-	// --------------------------- Initialization and configuration  ---------------------------
-
-
-	/**
-	 * Create a new FlinkKinesisProducer.
-	 * This is a constructor supporting Flink's {@see SerializationSchema}.
-	 *
-	 * @param schema Serialization schema for the data type
-	 * @param configProps The properties used to configure AWS credentials and AWS region
-	 */
-	public FlinkKinesisProducer(final SerializationSchema<OUT> schema, Properties configProps) {
-
-		// create a simple wrapper for the serialization schema
-		this(new KinesisSerializationSchema<OUT>() {
-			@Override
-			public ByteBuffer serialize(OUT element) {
-				// wrap into ByteBuffer
-				return ByteBuffer.wrap(schema.serialize(element));
-			}
-			// use default stream and hash key
-			@Override
-			public String getTargetStream(OUT element) {
-				return null;
-			}
-		}, configProps);
-	}
-
-	/**
-	 * Create a new FlinkKinesisProducer.
-	 * This is a constructor supporting {@see KinesisSerializationSchema}.
-	 *
-	 * @param schema Kinesis serialization schema for the data type
-	 * @param configProps The properties used to configure AWS credentials and AWS region
-	 */
-	public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps) {
-		this.configProps = checkNotNull(configProps, "configProps can not be null");
-
-		// check the configuration properties for any conflicting settings
-		KinesisConfigUtil.validateProducerConfiguration(this.configProps);
-
-		ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema));
-		this.schema = schema;
-	}
-
-	/**
-	 * If set to true, the producer will immediately fail with an exception on any error.
-	 * Otherwise, the errors are logged and the producer goes on.
-	 *
-	 * @param failOnError Error behavior flag
-	 */
-	public void setFailOnError(boolean failOnError) {
-		this.failOnError = failOnError;
-	}
-
-	/**
-	 * Set a default stream name.
-	 * @param defaultStream Name of the default Kinesis stream
-	 */
-	public void setDefaultStream(String defaultStream) {
-		this.defaultStream = defaultStream;
-	}
-
-	/**
-	 * Set default partition id
-	 * @param defaultPartition Name of the default partition
-	 */
-	public void setDefaultPartition(String defaultPartition) {
-		this.defaultPartition = defaultPartition;
-	}
-
-	public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
-		Objects.requireNonNull(partitioner);
-		ClosureCleaner.ensureSerializable(partitioner);
-		this.customPartitioner = partitioner;
-	}
-
-
-	// --------------------------- Lifecycle methods ---------------------------
-
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-
-		KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();
-
-		producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
-		producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
-		if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
-			producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
-					ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
-		}
-		if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
-			producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
-					ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
-		}
-
-		producer = new KinesisProducer(producerConfig);
-		callback = new FutureCallback<UserRecordResult>() {
-			@Override
-			public void onSuccess(UserRecordResult result) {
-				if (!result.isSuccessful()) {
-					if(failOnError) {
-						thrownException = new RuntimeException("Record was not sent successful");
-					} else {
-						LOG.warn("Record was not sent successful");
-					}
-				}
-			}
-
-			@Override
-			public void onFailure(Throwable t) {
-				if (failOnError) {
-					thrownException = t;
-				} else {
-					LOG.warn("An exception occurred while processing a record", t);
-				}
-			}
-		};
-
-		if (this.customPartitioner != null) {
-			this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
-		}
-
-		LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
-	}
-
-	@Override
-	public void invoke(OUT value) throws Exception {
-		if (this.producer == null) {
-			throw new RuntimeException("Kinesis producer has been closed");
-		}
-		if (thrownException != null) {
-			String errorMessages = "";
-			if (thrownException instanceof UserRecordFailedException) {
-				List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
-				for (Attempt attempt: attempts) {
-					if (attempt.getErrorMessage() != null) {
-						errorMessages += attempt.getErrorMessage() +"\n";
-					}
-				}
-			}
-			if (failOnError) {
-				throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
-			} else {
-				LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
-				thrownException = null; // reset
-			}
-		}
-
-		String stream = defaultStream;
-		String partition = defaultPartition;
-
-		ByteBuffer serialized = schema.serialize(value);
-
-		// maybe set custom stream
-		String customStream = schema.getTargetStream(value);
-		if (customStream != null) {
-			stream = customStream;
-		}
-
-		String explicitHashkey = null;
-		// maybe set custom partition
-		if (customPartitioner != null) {
-			partition = customPartitioner.getPartitionId(value);
-			explicitHashkey = customPartitioner.getExplicitHashKey(value);
-		}
-
-		if (stream == null) {
-			if (failOnError) {
-				throw new RuntimeException("No target stream set");
-			} else {
-				LOG.warn("No target stream set. Skipping record");
-				return;
-			}
-		}
-
-		ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
-		Futures.addCallback(cb, callback);
-	}
-
-	@Override
-	public void close() throws Exception {
-		LOG.info("Closing producer");
-		super.close();
-		KinesisProducer kp = this.producer;
-		this.producer = null;
-		if (kp != null) {
-			LOG.info("Flushing outstanding {} records", kp.getOutstandingRecordsCount());
-			// try to flush all outstanding records
-			while (kp.getOutstandingRecordsCount() > 0) {
-				kp.flush();
-				try {
-					Thread.sleep(500);
-				} catch (InterruptedException e) {
-					LOG.warn("Flushing was interrupted.");
-					// stop the blocking flushing and destroy producer immediately
-					break;
-				}
-			}
-			LOG.info("Flushing done. Destroying producer instance.");
-			kp.destroy();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
deleted file mode 100644
index bd23abe..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis;
-
-
-import java.io.Serializable;
-
-public abstract class KinesisPartitioner<T> implements Serializable {
-
-	/**
-	 * Return a partition id based on the input
-	 * @param element Element to partition
-	 * @return A string representing the partition id
-	 */
-	public abstract String getPartitionId(T element);
-
-	/**
-	 * Optional method for setting an explicit hash key
-	 * @param element Element to get the hash key for
-	 * @return the hash key for the element
-	 */
-	public String getExplicitHashKey(T element) {
-		return null;
-	}
-
-	/**
-	 * Optional initializer.
-	 *
-	 * @param indexOfThisSubtask Index of this partitioner instance
-	 * @param numberOfParallelSubtasks Total number of parallel instances
-	 */
-	public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
-		//
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
deleted file mode 100644
index 01d4f00..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.config;
-
-import com.amazonaws.auth.AWSCredentialsProvider;
-
-/**
- * Configuration keys for AWS service usage
- */
-public class AWSConfigConstants {
-
-	/**
-	 * Possible configuration values for the type of credential provider to use when accessing AWS Kinesis.
-	 * Internally, a corresponding implementation of {@link AWSCredentialsProvider} will be used.
-	 */
-	public enum CredentialProvider {
-
-		/** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials */
-		ENV_VAR,
-
-		/** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */
-		SYS_PROP,
-
-		/** Use a AWS credentials profile file to create the AWS credentials */
-		PROFILE,
-
-		/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */
-		BASIC,
-
-		/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata **/
-		AUTO,
-	}
-
-	/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */
-	public static final String AWS_REGION = "aws.region";
-
-	/** The AWS access key ID to use when setting credentials provider type to BASIC */
-	public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
-
-	/** The AWS secret key to use when setting credentials provider type to BASIC */
-	public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
-
-	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/
-	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
-
-	/** Optional configuration for profile path if credential provider type is set to be PROFILE */
-	public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
-
-	/** Optional configuration for profile name if credential provider type is set to be PROFILE */
-	public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
-
-	/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */
-	public static final String AWS_ENDPOINT = "aws.endpoint";
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
deleted file mode 100644
index 76c20ed..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.config;
-
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
-import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
-
-/**
- * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer}
- */
-public class ConsumerConfigConstants extends AWSConfigConstants {
-
-	/**
-	 * The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used
-	 * when the consumer tasks retrieve the first shard iterator for each Kinesis shard.
-	 */
-	public enum InitialPosition {
-
-		/** Start reading from the earliest possible record in the stream (excluding expired data records) */
-		TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
-
-		/** Start reading from the latest incoming record */
-		LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
-
-		private SentinelSequenceNumber sentinelSequenceNumber;
-
-		InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
-			this.sentinelSequenceNumber = sentinelSequenceNumber;
-		}
-
-		public SentinelSequenceNumber toSentinelSequenceNumber() {
-			return this.sentinelSequenceNumber;
-		}
-	}
-
-	/** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
-	public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
-
-	/** The base backoff time between each describeStream attempt */
-	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
-
-	/** The maximum backoff time between each describeStream attempt */
-	public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
-
-	/** The power constant for exponential backoff between each describeStream attempt */
-	public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
-
-	/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */
-	public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
-
-	/** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */
-	public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";
-
-	/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
-	public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";
-
-	/** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
-	public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";
-
-	/** The power constant for exponential backoff between each getRecords attempt */
-	public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";
-
-	/** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */
-	public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
-
-	/** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */
-	public static final String SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries";
-
-	/** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
-	public static final String SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base";
-
-	/** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
-	public static final String SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max";
-
-	/** The power constant for exponential backoff between each getShardIterator attempt */
-	public static final String SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst";
-
-	/** The interval between each attempt to discover new shards */
-	public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
-
-	// ------------------------------------------------------------------------
-	//  Default values for consumer configuration
-	// ------------------------------------------------------------------------
-
-	public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString();
-
-	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
-
-	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
-
-	public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
-
-	public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
-
-	public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
-
-	public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L;
-
-	public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L;
-
-	public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
-
-	public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L;
-
-	public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
-
-	public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;
-
-	public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L;
-
-	public static final double DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
-
-	public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L;
-
-	/**
-	 * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
-	 * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
-	 */
-	public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
deleted file mode 100644
index 1edddfc..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.config;
-
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
-
-/**
- * Optional producer specific configuration keys for {@link FlinkKinesisProducer}
- */
-public class ProducerConfigConstants extends AWSConfigConstants {
-
-	/** Maximum number of items to pack into an PutRecords request. **/
-	public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
-
-	/** Maximum number of items to pack into an aggregated record. **/
-	public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
deleted file mode 100644
index 55668c6..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis.examples;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import java.util.Properties;
-
-/**
- * This is an example on how to consume data from Kinesis
- */
-public class ConsumeFromKinesis {
-
-	public static void main(String[] args) throws Exception {
-		ParameterTool pt = ParameterTool.fromArgs(args);
-
-		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-		see.setParallelism(1);
-
-		Properties kinesisConsumerConfig = new Properties();
-		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region"));
-		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey"));
-		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey"));
-
-		DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
-			"flink-test",
-			new SimpleStringSchema(),
-			kinesisConsumerConfig));
-
-		kinesis.print();
-
-		see.execute();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
deleted file mode 100644
index d178137..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis.examples;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import java.util.Properties;
-
-/**
- * This is an example on how to produce data into Kinesis
- */
-public class ProduceIntoKinesis {
-
-	public static void main(String[] args) throws Exception {
-		ParameterTool pt = ParameterTool.fromArgs(args);
-
-		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-		see.setParallelism(1);
-
-		DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
-
-		Properties kinesisProducerConfig = new Properties();
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
-
-		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
-				new SimpleStringSchema(), kinesisProducerConfig);
-
-		kinesis.setFailOnError(true);
-		kinesis.setDefaultStream("flink-test");
-		kinesis.setDefaultPartition("0");
-
-		simpleStringStream.addSink(kinesis);
-
-		see.execute();
-	}
-
-	public static class EventsGenerator implements SourceFunction<String> {
-		private boolean running = true;
-
-		@Override
-		public void run(SourceContext<String> ctx) throws Exception {
-			long seq = 0;
-			while(running) {
-				Thread.sleep(10);
-				ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-}


Mime
View raw message