flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "M. Dale" <medal...@yahoo.com>
Subject Re: Reading and Writing to S3
Date Wed, 11 Jan 2017 17:40:16 GMT
Sam,   Don't point the variables at files, point them at the directories containing the files.
Do you have fs.s3.impl property defined?
Concrete example:
/home/markus/hadoop-config directory has one file "core-site.xml" with thefollowing content:
<configuration>    <property>        <name>fs.s3.impl</name> 
      <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>    </property>
    <!-- Comma separated list of local directories used to buffer         large
results prior to transmitting them to S3. -->    <property>        <name>fs.s3a.buffer.dir</name> 
      <value>/tmp</value>    </property>
    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> 
  <property>        <name>fs.s3a.access.key</name>        <value>YOUR_ACCESS_KEY</value> 
  </property>
    <!-- set your AWS access key -->    <property>        <name>fs.s3a.secret.key</name> 
      <value>YOUR_SECRET_KEY</value>    </property></configuration>
/home/markus/flink-config directory has one file "flink-conf.yaml" with the following content
point hadoopconf to the DIRECTORY containing core-site.xml:
fs.hdfs.hadoopconf: /home/markus/hadoop-config
In IntelliJ, go to Run - Edit Configurations - <your run configuration> andset the FLINK_CONF_DIR
environment variable to point to the directory containingflink-conf.yaml (i.e in my case /home/markus/flink-config).
So everything is pointing to directories where the code looks for well-known filenames.
With that, the following works to write to S3. (Maybe load events from collection at first):
events.writeAsText("s3://<bucket>/<prefix-dir>")

env.execute 

    On Wednesday, January 11, 2017 10:44 AM, Samra Kasim <samra.kasim@thehumangeo.com>
wrote:
 

 Hi Markus,
Thanks for your help. I created an environment variable in IntelliJ for FLINK_CONF_DIR to
point to the flink-conf.yaml and in it defined fs.hdfs.hadoopconf to point to the core-site.xml,
but when I do that, I get the error: java.io.IOException: No file system found with scheme
s3, referenced in file URI 's3://flink-test/ flinkoutputtest.txt'.
I have been able to get it to work by using the environment variable HADOOP_HOME to point
directly to the core-site.xml, but when I do that and I push data from Kafka, I can see the
message stream printed to my terminal, but no file gets saved to s3. I also don't see any
errors. I have the correct AWS access id and key because i am able to read from files on s3
using Flink.
My code is below:    public static voidmain(String[] args) throws Exception {       
Map<String,String> configs = ConfigUtils.loadConfigs("/ path/to/src/main/resources/
error-queue.yaml");         finalParameterTool parameterTool = ParameterTool.fromMap(configs)
;        StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment();       env.getConfig().
disableSysoutLogging();       env.getConfig(). setGlobalJobParameters( parameterTool)        DataStream<String>
messageStream = env               .addSource(new FlinkKafkaConsumer09<String>(                       parameterTool.getRequired("
kafka.topic"),                       new SimpleStringSchema(),                       parameterTool.getProperties())
);        messageStream.print();        messageStream.writeAsText("s3: //flink-test/flinkoutputtest.
txt").setParallelism(1);         env.execute();
On Tue, Jan 10, 2017 at 4:06 PM, M. Dale <medale94@yahoo.com> wrote:

Sam,  I just happened to answer a similar question on Stackoverflow at Does Apache Flink
AWS S3 Sink require Hadoop for local testing?. I also submitted a PR to make that (for me)
a little clearer on the Apache Flink documentation (https://github.com/apache/fli nk/pull/3054/files).
 
|  
|  
|  
|   |    |

  |

  |
|  
|   |  
Does Apache Flink AWS S3 Sink require Hadoop for local testing?
 I am relatively new to Apache Flink and I am trying to create a simple project that produces
a file to an AWS S3...  |   |

  |

  |

 
Let me know if that works for you.
Thanks,Markus 

    On Tuesday, January 10, 2017 3:17 PM, Samra Kasim <samra.kasim@thehumangeo.com>
wrote:
 

 Hi,
I am new to Flink and I've written two small test projects: 1) to read data from s3 and 2)
to push data to s3. However, I am getting two different errors for the projects relating to,
i think, how the core-site.xml file is being read. I am running the project locally in IntelliJ.
I have the environment variable in run configurations set to HADOOP_HOME=path/to/dir-with-c
ore-site.xml. I have also tried saving the core-site.xml in the src/main/resources folder
but get the same errors. I want to know if my core-site.xml file is configured correctly for
using s3a and how to have IntelliJ read the core-site.xml file? Also, are the core-site.xml
configurations different for reading versus writing to s3?
This is my code for reading data from s3:
public class DesktopWriter {     public static voidmain(String[] args) throws Exception
{        ExecutionEnvironment env =ExecutionEnvironment.createLoc alEnvironment();       DataSet<String>
data = env.readTextFile("s3://flink-t est/flink-test.txt");        data.print();   
}}I get the error: Caused by: java.io.IOException: Cannot determine access key to Amazon
S3. Please make sure to configure it by setting the configuration key 'fs.s3.accessKey'.This
is my code for writing to S3:public class S3Sink {
    public static void main(String[] args) throws Exception {
        Map<String, String> configs = ConfigUtils.loadConfigs(“path/ to/config.yaml");

        final ParameterTool parameterTool = ParameterTool.fromMap(configs) ;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.get ExecutionEnvironment();
        env.getConfig(). disableSysoutLogging();
        env.getConfig(). setGlobalJobParameters( parameterTool); 

        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>(
                        parameterTool.getRequired(" kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()) );

        messageStream.writeAsText(" s3a://flink-test/flinktest.txt ").setParallelism(1);

        env.execute();
    }I get the error: Caused by: java.io.IOException: The given file URI (s3://flink-test/flinktest.txt
) points to the HDFS NameNode at flink-test, but the File System could not be initialized
with that address: Unable to load AWS credentials from any provider in the chain
This is my core-site.xml:
<configuration>    <property>        <name>fs.defaultFS</name> 
      <value>hdfs://localhost:9000</ value>    </property>    <property> 
      <name>fs.s3.impl</name>        <value>org.apache.hadoop.fs.
s3a.S3AFileSystem</value>    </property>
    <!-- Comma separated list of local directories used to buffer         large
results prior to transmitting them to S3. -->    <property>        <name>fs.s3a.buffer.dir</name> 
      <value>/tmp</value>    </property>
    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a. Constants --> 
  <property>        <name>fs.s3a.awsAccessKeyId</ name>       
<value>*****</value>    </property>
    <!-- set your AWS access key -->    <property>        <name>fs.s3a.
awsSecretAccessKey</name>        <value>*****</value>    </property></configuration>This
is my pom.xml:<dependencies>    <dependency>       <groupId>org.apache.flink</gro
upId>       <artifactId>flink-java</artifa ctId>       <version>1.1.4</version>   </dependency>    
<dependency>       <groupId>org.apache.flink</gro upId>       
<artifactId>flink-streaming-ja va_2.10</artifactId>       <version>1.1.4</version>   </dependency>    
<dependency>       <groupId>org.apache.flink</gro upId>       <artifactId>flink-clients_2.10
</artifactId>       <version>1.1.4</version>   </dependency>    
<dependency>       <groupId>org.apache.flink</gro upId>       <artifactId>flink-connector-ka
fka-0.9_2.10</artifactId>       <version>1.1.4</version>   </dependency>    
<dependency>       <groupId>com.amazonaws</groupI d>        <artifactId>aws-java-sdk
</artifactId>       <version>1.7.4</version>   </dependency>    
<dependency>       <groupId>org.apache.hadoop</gr oupId>       <artifactId>hadoop-aws</artifa
ctId>       <version>2.7.2</version>   </dependency>     <dependency>       <groupId>org.apache.httpcompon
ents</groupId>       <artifactId>httpclient</artifa ctId>       <version>4.2.5</version>   </dependency>   
<dependency>       <groupId>org.apache.httpcompon ents</groupId>       <artifactId>httpcore</artifact
Id>       <version>4.2.5</version>   </dependency></dependencies>
Thanks!Sam

   




   
Mime
View raw message