flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Samra Kasim <samra.ka...@thehumangeo.com>
Subject Reading and Writing to S3
Date Tue, 10 Jan 2017 20:16:55 GMT
Hi,

I am new to Flink and I've written two small test projects: 1) to read data
from s3 and 2) to push data to s3. However, I am getting two different
errors for the projects relating to, i think, how the core-site.xml file is
being read. I am running the project locally in IntelliJ. I have the
environment variable in run configurations set to
HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the
core-site.xml in the src/main/resources folder but get the same errors. I
want to know if my core-site.xml file is configured correctly for using s3a
and how to have IntelliJ read the core-site.xml file? Also, are the
core-site.xml configurations different for reading versus writing to s3?

This is my code for reading data from s3:

public class DesktopWriter {



    public static void main(String[] args) throws Exception {



        ExecutionEnvironment env =
ExecutionEnvironment.createLocalEnvironment();

        DataSet<String> data =
env.readTextFile("s3://flink-test/flink-test.txt");

        data.print();

    }

}

I get the error: Caused by: java.io.IOException: Cannot determine access
key to Amazon S3. Please make sure to configure it by setting the
configuration key 'fs.s3.accessKey'.

This is my code for writing to S3:

public class S3Sink {
    public static void main(String[] args) throws Exception {
        Map<String, String> configs = ConfigUtils.*loadConfigs*(“path/
to/config.yaml");

        final ParameterTool parameterTool = ParameterTool.*fromMap*(configs)
;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.
*getExecutionEnvironment*();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setGlobalJobParameters(parameterTool);

        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>(
                        parameterTool.getRequired("kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()));

        messageStream.writeAsText("s3a://flink-test/flinktest.txt"
).setParallelism(1);

        env.execute();
    }

I get the error: Caused by: java.io.IOException: The given file URI
(s3://flink-test/flinktest.txt) points to the HDFS NameNode at flink-test,
but the File System could not be initialized with that address: Unable to
load AWS credentials from any provider in the chain

This is my core-site.xml:

<configuration>

    <property>

        <name>fs.defaultFS</name>

        <value>hdfs://localhost:9000</value>

    </property>

    <property>

        <name>fs.s3.impl</name>

        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>

    </property>


    <!-- Comma separated list of local directories used to buffer

         large results prior to transmitting them to S3. -->

    <property>

        <name>fs.s3a.buffer.dir</name>

        <value>/tmp</value>

    </property>


    <!-- set your AWS ID using key defined in
org.apache.hadoop.fs.s3a.Constants
-->

    <property>

        <name>fs.s3a.awsAccessKeyId</name>

        <value>*****</value>

    </property>


    <!-- set your AWS access key -->

    <property>

        <name>fs.s3a.awsSecretAccessKey</name>

        <value>*****</value>

    </property>

</configuration>
This is my pom.xml:

<dependencies>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-java</artifactId>

        <version>1.1.4</version>

    </dependency>



    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-streaming-java_2.10</artifactId>

        <version>1.1.4</version>

    </dependency>



    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-clients_2.10</artifactId>

        <version>1.1.4</version>

    </dependency>



    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>

        <version>1.1.4</version>

    </dependency>



    <dependency>

        <groupId>com.amazonaws</groupId>

        <artifactId>aws-java-sdk</artifactId>

        <version>1.7.4</version>

    </dependency>



    <dependency>

        <groupId>org.apache.hadoop</groupId>

        <artifactId>hadoop-aws</artifactId>

        <version>2.7.2</version>

    </dependency>



    <dependency>

        <groupId>org.apache.httpcomponents</groupId>

        <artifactId>httpclient</artifactId>

        <version>4.2.5</version>

    </dependency>

    <dependency>

        <groupId>org.apache.httpcomponents</groupId>

        <artifactId>httpcore</artifactId>

        <version>4.2.5</version>

    </dependency>

</dependencies>

Thanks!
Sam

Mime
View raw message