flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lokesh Gowda <lokeshgowd...@gmail.com>
Subject Re: Apache Flink Reading CSV Files ,Transform and Writting Back to CSV using Paralliesm
Date Sat, 26 Aug 2017 05:04:20 GMT
 Hi Robert  my question was if I need to read and write the csv file  of
size which will be in gb how i can distribute the data sink to write into
files 1gb exactly and since I am
New to flink I am not sure about this


Regards
Lokesh.r

On Sat, Aug 26, 2017 at 2:56 AM Robert Metzger <rmetzger@apache.org> wrote:

> Hi Lokesh,
>
> I'm not sure if I fully understood your question. But you can not write
> the result in a single file from multiple writers.
> If you want to process the data fully distributed, you'll also have to
> write it distributed.
>
> On Wed, Aug 23, 2017 at 8:07 PM, Lokesh R <lokesh.r@ericsson.com> wrote:
>
>> Hi Team,
>>
>> I am using the apache flink with java for below problem statement
>>
>> 1.where i will read a csv file with field delimeter  character ;
>> 2.transform the fields
>> 3.write back the data back to csv
>>
>> my doubts are as below
>>
>> 1. if i need to read the csv file of size above 50 gb what would be the
>> approach
>> 2 if i use Parallelism i am not able to split the data and collect it
>> since its a csv file
>> and while writing a back to csv its creating a multiple files to write
>> the data using the default Parallelism how can achieve the same
>>
>> sample input is
>>
>> 000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
>>
>> and sample output is
>>
>>
>> 000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
>>
>>
>> below is the code which i am currently running on local environment
>>
>> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>
>> package com.ericsson.voucher;
>>
>> import org.apache.flink.api.common.functions.GroupReduceFunction;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple8;
>>
>> import org.apache.flink.util.Collector;
>>
>>
>> public class Classification {
>>
>>     private static final String OUTPUT_PATH =
>> "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv";
>>
>>     public static void main(String[] args) throws Exception {
>>
>>
>>         ExecutionEnvironment env = ExecutionEnvironment
>>                 .getExecutionEnvironment();
>>         env.setParallelism(20);
>>
>>          long subViewStartTime = System.currentTimeMillis();
>>         DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>)
env
>>                 .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
>>                 .lineDelimiter("\n").types(String.class);
>>     DataSet<Tuple8<String,String, String, String, String, String, String,
>> String>> mails = rawdata
>>                 .flatMap(new DataExtractor()).rebalance();
>>                 mails.writeAsCsv(OUTPUT_PATH, "\n",
>> ";").setParallelism(1);
>>            mails.print();
>>            long subViewEndTime = System.currentTimeMillis();
>>
>>            long subViewDifference = subViewEndTime - subViewStartTime;
>>
>>            System.out.println("The Difference Time is"+
>> subViewDifference/1000 +"seconds");
>>
>>     }
>>
>>     public static class DataExtractor
>>             extends
>>             RichFlatMapFunction<Tuple1<String>, Tuple8<String, String,
>> String, String, String, String, String, String>> {
>>
>>         /**
>>          *
>>          */
>>         private static final long serialVersionUID = 1L;
>>
>>         public void flatMap(
>>                 Tuple1<String> paramIN,
>>                 org.apache.flink.util.Collector<Tuple8<String, String,
>> String, String, String, String, String, String>> out)
>>                 throws Exception {
>>             String[] lines = paramIN.f0.split(";");
>>             if (lines != null && lines.length > 0) {
>>                 String vocuherCode =lines[0];
>>                 vocuherCode=vocuherCode+"TEST1";
>>                 String VoucherId =  lines[1];
>>                 String voucherNumber = lines[2];
>>                 String status = lines[3]+"TWTSTST";
>>                 String startDate = lines[4] + "";
>>                 String endDate = lines[5] + "";
>>                 String endStatus = lines[6];
>>                 String endVoucherNumber = lines[7];
>>
>>
>>
>>
>>
>>             out.collect(new Tuple8<String, String, String, String,
>> String, String, String, String>(
>>                     vocuherCode, VoucherId, voucherNumber, status,
>>                     startDate, endDate, endStatus, endVoucherNumber));
>>             }
>>
>>         }
>>
>>     }
>>
>>     public static class RecordReducer
>>             implements
>>             GroupReduceFunction<Tuple8<String, String, String, String,
>> String, String, String, String>,
>>             Tuple8<String, String, String, String, String, String,
>> String, String>> {
>>
>>
>>         /**
>>          *
>>          */
>>         private static final long serialVersionUID =
>> -6045821605365596025L;
>>
>>         @Override
>>         public void reduce(
>>                 Iterable<Tuple8<String, String, String, String, String,
>> String, String, String>> paramIterable,
>>                 Collector<Tuple8<String, String, String, String, String,
>> String, String, String>> paramCollector)
>>                 throws Exception {
>>             // TODO Auto-generated method stub
>>             String vocuherCode = null;
>>             String VoucherId = null;
>>             String voucherNumber = null;
>>             String status = null;
>>             String startDate = null;
>>             String endDate = null;
>>             String endStatus = null;
>>             String endVoucherNumber = null;
>>             for (Tuple8<String, String, String, String, String, String,
>> String, String> m : paramIterable) {
>>                 vocuherCode = m.f0;
>>                 VoucherId = m.f1;
>>                 voucherNumber = m.f2;
>>                 status = m.f3;
>>                 startDate = m.f4;
>>                 endDate = m.f5;
>>                 endStatus = m.f6;
>>                 endVoucherNumber = m.f7;
>>                 paramCollector
>>                 .collect(new Tuple8<String, String, String, String,
>> String, String, String, String>(
>>                         vocuherCode, VoucherId, voucherNumber, status,
>>                         startDate, endDate, endStatus, endVoucherNumber));
>>
>>             }
>>
>>
>>
>>         }
>>     }
>>
>>
>> }
>>
>> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>
>> please help me on the same how can I achieve the portioning of fields on
>> the above data and achieve the parallism to increase the throughput of my
>> application
>>
>>
>>
>>
>>
>>
>>
>
>

Mime
View raw message