flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lokesh R <lokes...@ericsson.com>
Subject Apache Flink Reading CSV Files ,Transform and Writting Back to CSV using Paralliesm
Date Wed, 23 Aug 2017 18:07:22 GMT
Hi Team,

I am using the apache flink with java for below problem statement

1.where i will read a csv file with field delimeter  character ;
2.transform the fields
3.write back the data back to csv

my doubts are as below

1. if i need to read the csv file of size above 50 gb what would be the approach
2 if i use Parallelism i am not able to split the data and collect it since its a csv file
and while writing a back to csv its creating a multiple files to write the data using the
default Parallelism how can achieve the same

sample input is
000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940

and sample output is

000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940


below is the code which i am currently running on local environment

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

package com.ericsson.voucher;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple8;

import org.apache.flink.util.Collector;


public class Classification {

    private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv";

    public static void main(String[] args) throws Exception {


        ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();
        env.setParallelism(20);

         long subViewStartTime = System.currentTimeMillis();
        DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>)
env
                .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
                .lineDelimiter("\n").types(String.class);
    DataSet<Tuple8<String,String, String, String, String, String, String, String>>
mails = rawdata
                .flatMap(new DataExtractor()).rebalance();
                mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1);
           mails.print();
           long subViewEndTime = System.currentTimeMillis();

           long subViewDifference = subViewEndTime - subViewStartTime;

           System.out.println("The Difference Time is"+ subViewDifference/1000 +"seconds");

    }

    public static class DataExtractor
            extends
            RichFlatMapFunction<Tuple1<String>, Tuple8<String, String, String,
String, String, String, String, String>> {

        /**
         *
         */
        private static final long serialVersionUID = 1L;

        public void flatMap(
                Tuple1<String> paramIN,
                org.apache.flink.util.Collector<Tuple8<String, String, String, String,
String, String, String, String>> out)
                throws Exception {
            String[] lines = paramIN.f0.split(";");
            if (lines != null && lines.length > 0) {
                String vocuherCode =lines[0];
                vocuherCode=vocuherCode+"TEST1";
                String VoucherId =  lines[1];
                String voucherNumber = lines[2];
                String status = lines[3]+"TWTSTST";
                String startDate = lines[4] + "";
                String endDate = lines[5] + "";
                String endStatus = lines[6];
                String endVoucherNumber = lines[7];





            out.collect(new Tuple8<String, String, String, String, String, String, String,
String>(
                    vocuherCode, VoucherId, voucherNumber, status,
                    startDate, endDate, endStatus, endVoucherNumber));
            }

        }

    }

    public static class RecordReducer
            implements
            GroupReduceFunction<Tuple8<String, String, String, String, String, String,
String, String>,
            Tuple8<String, String, String, String, String, String, String, String>>
{


        /**
         *
         */
        private static final long serialVersionUID = -6045821605365596025L;

        @Override
        public void reduce(
                Iterable<Tuple8<String, String, String, String, String, String, String,
String>> paramIterable,
                Collector<Tuple8<String, String, String, String, String, String, String,
String>> paramCollector)
                throws Exception {
            // TODO Auto-generated method stub
            String vocuherCode = null;
            String VoucherId = null;
            String voucherNumber = null;
            String status = null;
            String startDate = null;
            String endDate = null;
            String endStatus = null;
            String endVoucherNumber = null;
            for (Tuple8<String, String, String, String, String, String, String, String>
m : paramIterable) {
                vocuherCode = m.f0;
                VoucherId = m.f1;
                voucherNumber = m.f2;
                status = m.f3;
                startDate = m.f4;
                endDate = m.f5;
                endStatus = m.f6;
                endVoucherNumber = m.f7;
                paramCollector
                .collect(new Tuple8<String, String, String, String, String, String, String,
String>(
                        vocuherCode, VoucherId, voucherNumber, status,
                        startDate, endDate, endStatus, endVoucherNumber));

            }



        }
    }


}
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

please help me on the same how can I achieve the portioning of fields on the above data and
achieve the parallism to increase the throughput of my application




Mime
View raw message