flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...
Date Wed, 20 Dec 2017 23:50:34 GMT
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5171#discussion_r158166090
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
---
    @@ -30,37 +30,44 @@
     import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
     import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
     import com.amazonaws.auth.profile.ProfileCredentialsProvider;
    -import com.amazonaws.regions.Region;
    +import com.amazonaws.client.builder.AwsClientBuilder;
     import com.amazonaws.regions.Regions;
    -import com.amazonaws.services.kinesis.AmazonKinesisClient;
    +import com.amazonaws.services.kinesis.AmazonKinesis;
    +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
     
     import java.util.Properties;
     
     /**
      * Some utilities specific to Amazon Web Service.
      */
     public class AWSUtil {
    +	/** Used for formatting Flink-specific user agent string when creating Kinesis client.
*/
    +	private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector";
     
     	/**
    -	 * Creates an Amazon Kinesis Client.
    +	 * Creates an AmazonKinesis client.
     	 * @param configProps configuration properties containing the access key, secret key,
and region
    -	 * @return a new Amazon Kinesis Client
    +	 * @return a new AmazonKinesis client
     	 */
    -	public static AmazonKinesisClient createKinesisClient(Properties configProps) {
    +	public static AmazonKinesis createKinesisClient(Properties configProps) {
     		// set a Flink-specific user agent
    -		ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig();
    -		awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion()
+
    -			" (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
    +		ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig()
    +				.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
    +														EnvironmentInformation.getVersion(),
    +														EnvironmentInformation.getRevisionInformation().commitId));
     
     		// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
    -		AmazonKinesisClient client = new AmazonKinesisClient(
    -			AWSUtil.getCredentialsProvider(configProps), awsClientConfig);
    +		AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
    +				.withCredentials(AWSUtil.getCredentialsProvider(configProps))
    +				.withClientConfiguration(awsClientConfig)
    +				.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
     
    -		client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
     		if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
    -			client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
    +			builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
    +													configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
    +													configProps.getProperty(AWSConfigConstants.AWS_REGION)));
    --- End diff --
    
    Why does the endpoint configuration have a region now?
    
    For example, lets say a user wants to test the connector against a local Kinesis mock
service at "localhost:1111". The user also originally was issuing against the regular AWS
Kinesis service, at region "us-west-1". The users properties would be like -
    ```
    configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-west-1");
    configProps.setProperty(AWSConfigConstants.AWS_ENDPOINT, "localhost:1111");
    ```
    
    In the past, this would correctly redirect requests to "localhost:1111".
    
    With this change, is this also the case? Or do we actually need to call `new AwsClientBuilder.EndpointConfiguration(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
null)` instead (do not provide region in endpoint)?


---

Mime
View raw message