flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [3/3] flink git commit: [FLINK-3229] Shade Google Protobuf into Kinesis connector to avoid runtime version conflict
Date Wed, 18 May 2016 13:04:14 GMT
[FLINK-3229] Shade Google Protobuf into Kinesis connector to avoid runtime version conflict


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd3dba6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd3dba6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd3dba6e

Branch: refs/heads/master
Commit: fd3dba6eef668ca8885523e0d8546603da095825
Parents: 8673cee
Author: Robert Metzger <rmetzger@apache.org>
Authored: Fri Apr 29 15:20:18 2016 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed May 18 11:38:43 2016 +0200

----------------------------------------------------------------------
 .../flink-connector-kinesis/pom.xml             | 34 ++++++++++++++++++++
 .../kinesis/FlinkKinesisConsumer.java           |  8 +++--
 .../kinesis/FlinkKinesisProducer.java           |  2 ++
 .../kinesis/internals/KinesisDataFetcher.java   |  3 +-
 4 files changed, 43 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd3dba6e/flink-streaming-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/pom.xml b/flink-streaming-connectors/flink-connector-kinesis/pom.xml
index ce49fd7..60cbe2e 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kinesis/pom.xml
@@ -121,6 +121,40 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+							<artifactSet combine.children="append">
+								<includes>
+									<include>com.amazonaws:*</include>
+									<include>com.google.protobuf:*</include>
+								</includes>
+							</artifactSet>
+							<relocations combine.children="override">
+								<!-- DO NOT RELOCATE GUAVA IN THIS PACKAGE -->
+								<relocation>
+									<pattern>org.objectweb.asm</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>com.google.protobuf</pattern>
+									<shadedPattern>org.apache.flink.kinesis.shaded.com.google.protobuf</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/fd3dba6e/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 97cd048..1016251 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -412,7 +412,8 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
 			// so the Access Key ID and Secret Key must be given
 			if (!config.containsKey(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID)
 				|| !config.containsKey(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY))
{
-				throw new IllegalArgumentException("Need to set values for AWS Access Key ID and Secret
Key when using the BASIC AWS credential provider type.");
+				throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID+"')
" +
+						"and Secret Key ('" + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY
+ "') when using the BASIC AWS credential provider type.");
 			}
 		} else {
 			String credentialsProviderType = config.getProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE);
@@ -433,13 +434,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
 			if (providerType == CredentialProviderType.BASIC) {
 				if (!config.containsKey(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID)
 					|| !config.containsKey(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY))
{
-					throw new IllegalArgumentException("Need to set values for AWS Access Key ID and Secret
Key when using the BASIC AWS credential provider type.");
+					throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID+"')
" +
+							"and Secret Key ('" + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY
+ "') when using the BASIC AWS credential provider type.");
 				}
 			}
 		}
 
 		if (!config.containsKey(KinesisConfigConstants.CONFIG_AWS_REGION)) {
-			throw new IllegalArgumentException("The AWS region must be set in the config.");
+			throw new IllegalArgumentException("The AWS region ('" + KinesisConfigConstants.CONFIG_AWS_REGION
+ "') must be set in the config.");
 		} else {
 			// specified AWS Region name must be recognizable
 			if (!AWSUtil.isValidRegion(config.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)))
{

http://git-wip-us.apache.org/repos/asf/flink/blob/fd3dba6e/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 9c9bd28..39bb70e 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -163,6 +163,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT>
{
 		KinesisProducerConfiguration config = new KinesisProducerConfiguration();
 		config.setRegion(this.region);
 		config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey,
this.secretKey)));
+		//config.setCollectionMaxCount(1);
+		//config.setAggregationMaxCount(1);
 		producer = new KinesisProducer(config);
 		callback = new FutureCallback<UserRecordResult>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fd3dba6e/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 9bf8468..6aaad8a 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -21,6 +21,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,7 +106,7 @@ public class KinesisDataFetcher {
 		ArrayList<ShardConsumerThread<?>> consumerThreads = new ArrayList<>(assignedShardsWithStartingSequenceNum.size());
 		for (Map.Entry<KinesisStreamShard, String> assignedShard : assignedShardsWithStartingSequenceNum.entrySet())
{
 			ShardConsumerThread<T> thread = new ShardConsumerThread<>(this, configProps,
assignedShard.getKey(),
-				assignedShard.getValue(), sourceContext, deserializationSchema, lastSequenceNums);
+				assignedShard.getValue(), sourceContext, InstantiationUtil.clone(deserializationSchema),
lastSequenceNums);
 			thread.setName(String.format("ShardConsumer - %s - %s/%s",
 				taskName, assignedShard.getKey().getStreamName() ,assignedShard.getKey().getShardId()));
 			thread.setDaemon(true);


Mime
View raw message