Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D6284185F5 for ; Tue, 27 Oct 2015 10:21:45 +0000 (UTC) Received: (qmail 41837 invoked by uid 500); 27 Oct 2015 10:21:41 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 41741 invoked by uid 500); 27 Oct 2015 10:21:41 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 41731 invoked by uid 99); 27 Oct 2015 10:21:41 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Oct 2015 10:21:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D81C31A2440 for ; Tue, 27 Oct 2015 10:21:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.007 X-Spam-Level: *** X-Spam-Status: No, score=3.007 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.008, HTML_MESSAGE=3, MANY_SPAN_IN_TEXT=0.001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id PA7iTCoT0tks for ; Tue, 27 Oct 2015 10:21:39 +0000 (UTC) Received: from mail-vk0-f44.google.com (mail-vk0-f44.google.com [209.85.213.44]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id A997120604 for ; Tue, 27 Oct 2015 10:21:38 +0000 (UTC) Received: by vkgy127 with SMTP id y127so117483592vkg.0 for ; Tue, 27 Oct 2015 03:21:37 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:cc:content-type; bh=W1+YwNeAbo05U6my2s7H+wx/O53GmrXuaI/ZZgDAKNA=; b=lLli+7LLD89ax0U/y8Yn0avGnEx19twReeOprM340zfNtFuM4AYTCAjGURiKqYpeG7 HiumX2rO91ti406jwyi3HTnhnE7Lo1kKMnO+dOA2o9f9LrOBC7lNTcTtBvOxSX5meaJj NB0yOXCQ7VhN1XQ3bNrEI3fel4CDD+AG+P07NEOsela2e9n0PrETZHUE2a4Mt12U3KmG OO+06B/f1PNjE+Fx/ufWV4dUY1eI5DReUSgXD+yECCk0S5I7Q+jrARxbFy3ZxJ5u+T3I 5/sdRQvgvjFJShDqmhVtiGT3xK0t4uf6GIFop8Wc2S1LGj+i85B5JdObpzzScvT7zbGH vHBw== MIME-Version: 1.0 X-Received: by 10.31.41.133 with SMTP id p127mr10225180vkp.132.1445941297514; Tue, 27 Oct 2015 03:21:37 -0700 (PDT) Sender: odeaching@gmail.com Received: by 10.31.221.65 with HTTP; Tue, 27 Oct 2015 03:21:37 -0700 (PDT) In-Reply-To: <2CB944F9-0C35-43F2-A289-C8596CC176FE@gmail.com> References: <2CB944F9-0C35-43F2-A289-C8596CC176FE@gmail.com> Date: Tue, 27 Oct 2015 18:21:37 +0800 X-Google-Sender-Auth: UDk7MJkGGxFcP9WtZ8liwmXxeZI Message-ID: Subject: Re: spark to hbase From: Deng Ching-Mallete To: jinhong lu Cc: spark users Content-Type: multipart/alternative; boundary=001a113f0256678eab0523137101 --001a113f0256678eab0523137101 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, It would be more efficient if you configure the table and flush the commits by partition instead of per element in the RDD. The latter works fine because you only have 4 elements, but it won't bid well for large data sets IMO.. Thanks, Deng On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu wrote: > > Hi, > > I write my result to hdfs, it did well: > > val model =3D lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFun= ction).aggregateByKey(new TrainFeature())(seqOp, combOp).values > model.map(a =3D> (a.toKey() + "\t" + a.totalCount + "\t" + a.positiveCou= nt)).saveAsTextFile(modelDataPath); > > But when I want to write to hbase, the applicaton hung, no log, no > response, just stay there, and nothing is written to hbase: > > val model =3D lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFun= ction).aggregateByKey(new TrainFeature())(seqOp, combOp).values.foreach({ r= es =3D> > val configuration =3D HBaseConfiguration.create(); > configuration.set("hbase.zookeeper.property.clientPort", "2181"); > configuration.set("hbase.zookeeper.quorum", =E2=80=9C192.168.1.66"); > configuration.set("hbase.master", "192.168.1:60000"); > val hadmin =3D new HBaseAdmin(configuration); > val table =3D new HTable(configuration, "ljh_test3"); > var put =3D new Put(Bytes.toBytes(res.toKey())); > put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.total= Count + res.positiveCount)); > table.put(put); > table.flushCommits() > }) > > And then I try to write som simple data to hbase, it did well too: > > sc.parallelize(Array(1,2,3,4)).foreach({ res =3D> > val configuration =3D HBaseConfiguration.create(); > configuration.set("hbase.zookeeper.property.clientPort", "2181"); > configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); > configuration.set("hbase.master", "192.168.1:60000"); > val hadmin =3D new HBaseAdmin(configuration); > val table =3D new HTable(configuration, "ljh_test3"); > var put =3D new Put(Bytes.toBytes(res)); > put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res)); > table.put(put); > table.flushCommits() > }) > > what is the problem with the 2rd code? thanks a lot. > > --001a113f0256678eab0523137101 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

It would be more efficient if you c= onfigure the table and flush the commits by partition instead of per elemen= t in the RDD. The latter works fine because you only have 4 elements, but i= t won't bid well for large data sets IMO..

Tha= nks,
Deng

On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu = <lujinhong2@gm= ail.com> wrote:


Hi,=C2=A0<= /p>

I write my result to hdfs, it did well:

val model =3D lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new TrainFeature=
())(seqOp, combOp).values
model.map(a =3D> (a.toKey() + "\t"= ; + a.totalCount + "\t" + a.positiveCount)).saveAsTextFile(modelDataPath);

<= p style=3D"margin:0 0 1.1em">But when I want to write to hbase, the applica= ton hung, no log, no response, just stay there, and nothing is written to h= base:

val model =3D lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new TrainFeature=
())(seqOp, combOp).values.foreach({ res =3D>
val configuration =3D HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", =E2= =80=9C192.168.1.66");
configuration.= set("
hbase.master", "192.168.1:60000");
val hadmin =3D new HBaseAdmin(configuration);
= val table =3D new HTable(configuration, "
ljh_test3");
var put =3D new Put(Bytes.toBytes(res.toKe= y()));
put.add(Bytes.toBytes("
f"), Bytes.toBytes("c"= ;), Bytes.toBytes(res.totalCount + res.positiveCount));
table.put(put)= ;
table.flushCommits()
})

And then I try to write som simple data to hbase, it did we= ll too:

sc.parallelize(Array(1,2,3,4))=
.foreach({ res =3D>
val configuration = =3D HBaseConfiguration.create()= ;
configuration.set("hbase.zookeeper.p= roperty.clientPort", "2181&q= uot;);
configuration.set("hbase= .zookeeper.quorum", "192.168= .1.66");
configuration.= set(&quo= t;hbase.master", "192.168.1:= 60000");
val hadmin =3D= new HBaseAdmin(configuration);
val= table =3D new HTable(configuration, "ljh= _test3");
var put =3D new Put(Bytes.toBytes(res));
put.add(<= span style=3D"color:#e6db74">Bytes.toBytes("f"), Bytes.toByt= es("c"), Bytes.toBytes(res));=
table.put(put);
table.flushCommits()
})

what is the problem with the 2rd code? thanks a lot= .


--001a113f0256678eab0523137101--