flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (FLINK-9618) NullPointerException in FlinkKinesisProducer when aws.region is not set and aws.endpoint is set
Date Thu, 12 Jul 2018 09:43:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-9618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Aljoscha Krettek reassigned FLINK-9618:
---------------------------------------

    Assignee: Aaron Langford

> NullPointerException in FlinkKinesisProducer when aws.region is not set and aws.endpoint
is set
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9618
>                 URL: https://issues.apache.org/jira/browse/FLINK-9618
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>    Affects Versions: 1.5.0
>         Environment: N/A
>            Reporter: Aaron Langford
>            Assignee: Aaron Langford
>            Priority: Minor
>   Original Estimate: 3h
>  Remaining Estimate: 3h
>
> This problem arose while trying to write to a local kinesalite instance. Specifying the
aws.region and the aws.endpoint is not allowed. However when the aws.region is not present,
a NullPointer exception is thrown.
> Here is some example Scala code:
> {code:java}
> /**
>   *
>   * @param region the AWS region the stream lives in
>   * @param streamName the stream to write records to
>   * @param endpoint if in local dev, this points to a kinesalite instance
>   * @return
>   */
> def getSink(region: String,
>             streamName: String,
>             endpoint: Option[String]): FlinkKinesisProducer[ProcessedMobilePageView]
= {
>   val props = new Properties()
>   props.put(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO")
>   endpoint match {
>     case Some(uri) => props.put(AWSConfigConstants.AWS_ENDPOINT, uri)
>     case None => props.put(AWSConfigConstants.AWS_REGION, region)
>   }
>   val producer = new FlinkKinesisProducer[ProcessedMobilePageView](
>     new JsonSerializer[ProcessedMobilePageView](DefaultSerializationBuilder),
>     props
>   )
>   producer.setDefaultStream(streamName)
>   producer
> }
> {code}
> To produce the NullPointerException, pass in `Some("localhost:4567")` for endpoint.
> The source of the error is found at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.java,
on line 194. This line should perform some kind of check if aws.endpoint is present before
grabbing it from the Properties object.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message