Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5A38C200B8B for ; Tue, 4 Oct 2016 14:27:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 58C3C160AC9; Tue, 4 Oct 2016 12:27:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 532D2160AC5 for ; Tue, 4 Oct 2016 14:27:53 +0200 (CEST) Received: (qmail 4003 invoked by uid 500); 4 Oct 2016 12:27:52 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 3993 invoked by uid 99); 4 Oct 2016 12:27:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Oct 2016 12:27:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D6C63C1CE7 for ; Tue, 4 Oct 2016 12:27:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.28 X-Spam-Level: * X-Spam-Status: No, score=1.28 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=okkam-it.20150623.gappssmtp.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id yCB_QLsIFzkp for ; Tue, 4 Oct 2016 12:27:50 +0000 (UTC) Received: from mail-lf0-f41.google.com (mail-lf0-f41.google.com [209.85.215.41]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id 3FF525F4ED for ; Tue, 4 Oct 2016 12:27:49 +0000 (UTC) Received: by mail-lf0-f41.google.com with SMTP id b81so61815344lfe.1 for ; Tue, 04 Oct 2016 05:27:49 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=okkam-it.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=1FBBWp+y41+GSp7qpPTqcqdl7BmdsSMsyr6a+Dvugqw=; b=Z5mmRoY7UyDHIbOG5EFR1p+BeqvvhmZpZFA+/1MEwoO2GIHKYz7cvWYpMpnQxJUg6v vjjpyB6Wy7dmsfGCDWRGDefaOm+xkk1cD877MlIHWhWhti0eCAgk4sxSBicEde/8LKKq FdVh6j9Ep4utXOO4WKcEPOCsUi9tX72yA8EqN1WiBtkmsEUt3tzjjN176nPwlmdmzPr/ BVZSdnydi3vki6e3/0PfX/qQkRDa0CNCJR/WSegy/bBCBKs+rw2gPFF0rcUqeCcitUQa ZiF6NmnEMChXixg8gdmb40/7v4xh8vYvCx6hJt+PVZcmG2fdK2uAOulM0MSuDj99P9LI KnTw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=1FBBWp+y41+GSp7qpPTqcqdl7BmdsSMsyr6a+Dvugqw=; b=OBeNps5BwtLdh74t1t00ho575yXmu+Cxm0AWbYOZsMrchHxj/C2Cy9VH4QU+2579Pv I7YNV8ZVWz2xsGAf2ugGar0PUzLKXBtUHDFTHUGYi8h1xE5JXcaN243+bELNeOiWDcK1 jnbrvYe4IoLmbqZ+AYtt2jJ5XFle3PFG1Z9vY0bQlUu1nK0yPsntfHpsTEn/XkaquIc2 a5egoYRZS2VCpAe6F0LLHp2ZOac5yZP5i5bNziBoiGQv5JzrMXrfs9a/xPlhImWWTqds xvYtjhE+l3GY5AnO+ceD+GPjg6Uuoj3o+ghuGHlgzixT8i/MOaQVkLLMfFz+wqKBmvl5 LIgA== X-Gm-Message-State: AA6/9RkGeVspy4LKp10Rt1u5RhevnFtDB1FankBcixxSf3k/c13uTMd/W+kXljge4D90DRTgXuJd9YQOTuFP5w== X-Received: by 10.25.133.8 with SMTP id h8mr1184838lfd.152.1475584069254; Tue, 04 Oct 2016 05:27:49 -0700 (PDT) MIME-Version: 1.0 Received: by 10.114.83.65 with HTTP; Tue, 4 Oct 2016 05:27:28 -0700 (PDT) X-Originating-IP: [213.203.177.29] In-Reply-To: References: From: Flavio Pompermaier Date: Tue, 4 Oct 2016 14:27:28 +0200 Message-ID: Subject: Re: Flink scala or Java - Dataload from CSV to database.. To: dev@flink.apache.org Cc: user Content-Type: multipart/alternative; boundary=001a113fba9e48e4ae053e093082 archived-at: Tue, 04 Oct 2016 12:27:54 -0000 --001a113fba9e48e4ae053e093082 Content-Type: text/plain; charset=UTF-8 I think you can start from this (using flink table-api), I hope it could be helpful: PS:maybe someone could write a blog post on how to do this with Scala since it's a frequent question on the mailing list... :) public static void main(String[] args) throws Exception { String path = "file:/tmp/myFile.csv"; String rowDelim = CsvInputFormat.DEFAULT_LINE_DELIMITER; String fieldDelim = CsvInputFormat.DEFAULT_FIELD_DELIMITER; String[] fieldNames = "Column 1,Column 2,Column 3,Column4".split(fieldDelim); Character quoteCharacter = '"'; boolean ignoreFirstLine = Boolean.TRUE; String ignoreComments = null; boolean lenient = Boolean.FALSE; TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; ExecutionEnvironment env = ExecutionEnvironment. getExecutionEnvironment(); CsvTableSource csvTableSource = new CsvTableSource(path, fieldNames,fieldTypes,fieldDelim, rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient); DataSet csvDataSet = csvTableSource.getDataSet(env); JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("org.postgresql.Driver") .setDBUrl("jdbc:postgresql:// localhost/test?user=xxx&password=xxx") .setQuery("insert into %s (id, title, author, price, qty) values (?,?,?,?,?)") .finish(); csvDataSet .output(jdbcOutputFormat); } Best, Flavio On Tue, Oct 4, 2016 at 2:18 PM, ram kumar wrote: > Hi Guys, > > We are in the process of creating POC, > I am looking for the sample project - Flink scala or java which can load > data from database to database or > CSV to relational database(any). > > > CSV ----------> SQLSERVER ----------> AWS Redshift > > could you please some one help me on that.. > > Cheers > Ram > --001a113fba9e48e4ae053e093082 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I think you can start from this (using flink table-api), I= hope it could be helpful:

PS:maybe someone could write = a blog post on how to do this with Scala since it's a frequent question= on the mailing list... :)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=C2=A0
public static void main(String[] args) thr= ows Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 String path =3D "file:/tmp/myFile.csv";
String rowDelim = =3D CsvInputFormat.DEFAULT_LINE_DELIMITER;
String fieldDelim =3D CsvInputFormat.DEFAULT_FIELD_DELIMIT= ER;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Str= ing[] fieldNames =3D "Column 1,Column 2,Column 3,Column4".split(f= ieldDelim);

Chara= cter quoteCharacter =3D '"';
<= /span>boolean ignoreFirstLine =3D Boolean.TRUE;
String ignoreComments =3D null;
= boolean lenient =3D Boolean.FALSE;

<= span class=3D"gmail-m_8567232210186357376gmail-Apple-tab-span" style=3D"whi= te-space:pre-wrap"> TypeInformation<?>[] fieldTypes =3D new T= ypeInformation<?>[] {
BasicTy= peInfo.STRING_TYPE_INFO,
Basic= TypeInfo.INT_TYPE_INFO,
BasicTypeIn= fo.STRING_TYPE_INFO,
BasicType= Info.STRING_TYPE_INFO
};
<= br class=3D"gmail-m_8567232210186357376gmail-Apple-interchange-newline">=C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ExecutionEnvironment e= nv =3D ExecutionEnvironment.getExecutionEnvironment();

<= /div>
CsvTableSource csvTableSource =3D ne= w CsvTableSource(path,fieldNames,fieldTypes,fieldDelim,
=C2=A0=C2=A0 =C2=A0rowDelim, quoteCharacter,= ignoreFirstLine, ignoreComments, lenient);
=
DataSet<Row> csvDataS= et =3D csvTableSource.getDataSet(env);
= JDBCOutputFormat jdbcOutputFormat =3D JDBCOutputFormat.buildJDBCOutpu= tFormat() .setDrivername("org.postgresql.Driver") .setDBUrl("jdbc:postgresql://localhost/test?user=3Dxxx&password=3Dxxx") .setQuery("insert into %s (id, title, author, price, qty) values (= ?,?,?,?,?)") .finish(); csvDataSet.output(jdbcOutp= utFormat);
}
<= div style=3D"font-size:12.8000001907349px">
<= /div>
Best,<= /span>
Flavio

On Tue, Oct 4, 2016 at 2:18 PM, ram kumar <ramkumar= 09951@gmail.com> wrote:
Hi Guys,

We are in the process of creating POC,
I am looking for the sample project - Flink scala or java which can load data from database to database or
CSV to relational database(any).


CSV ----------> SQLSERVER=C2=A0 ----------> AWS Redshift

could you please some one help me on that..

Cheers
Ram



--001a113fba9e48e4ae053e093082--