Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9A0F217B6E for ; Fri, 5 Jun 2015 02:55:29 +0000 (UTC) Received: (qmail 80715 invoked by uid 500); 5 Jun 2015 02:55:29 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 80644 invoked by uid 500); 5 Jun 2015 02:55:29 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 80634 invoked by uid 99); 5 Jun 2015 02:55:29 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Jun 2015 02:55:29 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D6D66C08D9 for ; Fri, 5 Jun 2015 02:55:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.313 X-Spam-Level: * X-Spam-Status: No, score=1.313 tagged_above=-999 required=6.31 tests=[SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id t2yLY1tP3TqX for ; Fri, 5 Jun 2015 02:55:17 +0000 (UTC) Received: from nk11p12im-asmtp002.me.com (nk11p12im-asmtp002.me.com [17.158.88.161]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 2FF20428ED for ; Fri, 5 Jun 2015 02:55:17 +0000 (UTC) Received: from [172.25.45.44] (unknown [165.132.5.35]) by nk11p12im-asmtp002.me.com (Oracle Communications Messaging Server 7.0.5.35.0 64bit (built Dec 4 2014)) with ESMTPSA id <0NPG00CPJARVZ610@nk11p12im-asmtp002.me.com> for user@flink.apache.org; Fri, 05 Jun 2015 02:55:10 +0000 (GMT) X-Proofpoint-Virus-Version: vendor=fsecure engine=2.50.10432:5.14.151,1.0.33,0.0.0000 definitions=2015-06-05_01:2015-06-05,2015-06-04,1970-01-01 signatures=0 X-Proofpoint-Spam-Details: rule=notspam policy=default score=0 spamscore=0 suspectscore=1 phishscore=0 adultscore=0 bulkscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=7.0.1-1412110000 definitions=main-1506050034 Content-type: text/plain; charset=utf-8 MIME-version: 1.0 (Mac OS X Mail 8.2 \(2098\)) Subject: Re: Apache Flink transactions From: Chiwan Park In-reply-to: <1433463751772-1491.post@n4.nabble.com> Date: Fri, 05 Jun 2015 11:55:07 +0900 Content-transfer-encoding: quoted-printable Message-id: References: <6C25AA06-E4A6-4647-8642-30744E2BDFAE@icloud.com> <1433463751772-1491.post@n4.nabble.com> To: user@flink.apache.org X-Mailer: Apple Mail (2.2098) Basically Flink uses Data Model in functional programming model. All = DataSet is immutable. This means we cannot modify DataSet but =E3=85=90onl= y can create new DataSet with modification. Update, delete query are = represented as writing filtered DataSet. Following scala sample shows select, insert, update, and remove query in = Flink. (I=E2=80=99m not sure this is best practice.) case class MyType(id: Int, value1: String, value2: String) // load data (you can use readCsvFile, or something else.) val data =3D env.fromElements(MyType(0, =E2=80=9Ctest=E2=80=9D, = =E2=80=9Ctest2=E2=80=9D), MyType(1, =E2=80=9Chello=E2=80=9D, = =E2=80=9Cflink=E2=80=9D), MyType(2, =E2=80=9Cflink=E2=80=9D, = =E2=80=9Cgood=E2=80=9D)) // selecting // same as SELECT * FROM data WHERE id =3D 1 val selectedData1 =3D data.filter(_.id =3D=3D 1) // same as SELECT value1 FROM data WHERE id =3D 1 val selectedData2 =3D data.filter(_.id =3D=3D 1).map(_.value1) // removing is same as selecting such as following // same as DELETE FROM data WHERE id =3D 1, but DataSet data is not = changed. the result is removedData val removedData =3D data.filter(_.id !=3D 1) // inserting // same as INSERT INTO data (id, value1, value2) VALUES (3, =E2=80=9Cnew=E2= =80=9D, =E2=80=9Cdata=E2=80=9D) val newData =3D env.fromElements(MyType(3, =E2=80=9Cnew=E2=80=9D, = =E2=80=9Cdata=E2=80=9D)) val insertedData =3D data.union(newData)=20 // updating // UPDATE data SET value1 =3D =E2=80=9Cupdated=E2=80=9D, value2 =3D = =E2=80=9Cdata=E2=80=9D WHERE id =3D 1, but DataSet data is not changed. val updatedData =3D data.map { x =3D> if (x.id =3D=3D 1) MyType(x.id, = =E2=80=9Cupdated=E2=80=9D, =E2=80=9Cdata=E2=80=9D) else x }=20 Regards, Chiwan Park > On Jun 5, 2015, at 9:22 AM, hawin wrote: >=20 > Hi Chiwan >=20 > Thanks for your information. I knew Flink is not DBMS. I want to know = what > is the flink way to select, insert, update and delete data on HDFS. >=20 >=20 > @Till > Maybe union is a way to insert data. But I think it will cost some > performance issue. >=20 >=20 > @Stephan > Thanks for your suggestion. I have checked apache flink roadmap. SQL = on > flink will be released on Q3/Q4 2015. Will it support insertion, = deletion > and update data on HDFS? > You guys already provided a nice example for selecting data on HDFS. = Such > as: TPCHQuery10 and TPCHQuery3.=20 > Do you have other examples for inserting, updating and removing data = on HDFS > by Apache flink >=20 > Thanks=20 >=20 >=20 >=20 >=20 > -- > View this message in context: = http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Apa= che-Flink-transactions-tp1457p1491.html > Sent from the Apache Flink User Mailing List archive. mailing list = archive at Nabble.com.