Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 86593106F6 for ; Sat, 15 Nov 2014 02:31:19 +0000 (UTC) Received: (qmail 56675 invoked by uid 500); 15 Nov 2014 02:31:16 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 56602 invoked by uid 500); 15 Nov 2014 02:31:16 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 56592 invoked by uid 99); 15 Nov 2014 02:31:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 Nov 2014 02:31:16 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of malouf.gary@gmail.com designates 209.85.216.52 as permitted sender) Received: from [209.85.216.52] (HELO mail-qa0-f52.google.com) (209.85.216.52) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 Nov 2014 02:30:50 +0000 Received: by mail-qa0-f52.google.com with SMTP id u7so12134660qaz.39 for ; Fri, 14 Nov 2014 18:29:20 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=myfU+/tLHHwR1rZVafcRNH3C/PIjnGIgbkrI6u4Pfx4=; b=WHn6EWNCSgWUNvngFVBnd8UmzQTMv0adEBEdGMdoO16g1yQ9ncdK3HuPxdDzk4yaLx KIc0lX+CT5v9hUo8wjmQ2e+rr9+kFYAncjz9vTn8H9FwBO5kcm30COvIWuMxiKTCmUmh Be+E6UaftSsFbsnniC0YIdJDcfMaPzxwKDeHwF9cRg61cXP6ZUOiuNn95SBMj4k2GT2p whP+FRKy9GiMwzIWzQ76T0da/CIg/Nxv8tPe2c7+2XAYHAWUqIUFUIA/iaXZ1wz8vuel fEfK6TdUqUqBUxuZryDeH4nRo6OwK2iigCJ9YByHI+9q3mz6t1CulQA9uWzokoDXyzzK gkGA== MIME-Version: 1.0 X-Received: by 10.224.54.205 with SMTP id r13mr15582950qag.73.1416018559995; Fri, 14 Nov 2014 18:29:19 -0800 (PST) Received: by 10.140.41.74 with HTTP; Fri, 14 Nov 2014 18:29:19 -0800 (PST) In-Reply-To: <28DC68C8-CF2D-4F6B-B67A-5E3712EDAB88@databricks.com> References: <3E25FA66-1932-4BAF-9482-F2A21717FDD7@databricks.com> <28DC68C8-CF2D-4F6B-B67A-5E3712EDAB88@databricks.com> Date: Fri, 14 Nov 2014 21:29:19 -0500 Message-ID: Subject: Re: Sourcing data from RedShift From: Gary Malouf To: Xiangrui Meng Cc: Michael Armbrust , "user@spark.apache.org" Content-Type: multipart/alternative; boundary=089e014933e843970e0507dc8307 X-Virus-Checked: Checked by ClamAV on apache.org --089e014933e843970e0507dc8307 Content-Type: text/plain; charset=UTF-8 I'll try this out and follow up with what I find. On Fri, Nov 14, 2014 at 8:54 PM, Xiangrui Meng wrote: > For each node, if the CSV reader is implemented efficiently, you should be > able to hit at least half of the theoretical network bandwidth, which is > about 60MB/second/node. So if you just do counting, the expect time should > be within 3 minutes. > > Note that your cluster have 15GB * 12 = 180GB RAM in total. If you use the > default spark.storage.memoryFraction, it can barely cache 100GB of data, > not considering the overhead. So if your operation need to cache the data > to be efficient, you may need a larger cluster or change the storage level > to MEMORY_AND_DISK. > > -Xiangrui > > On Nov 14, 2014, at 5:32 PM, Gary Malouf wrote: > > Hmm, we actually read the CSV data in S3 now and were looking to avoid > that. Unfortunately, we've experienced dreadful performance reading 100GB > of text data for a job directly from S3 - our hope had been connecting > directly to Redshift would provide some boost. > > We had been using 12 m3.xlarges, but increasing default parallelism (to 2x > # of cpus across cluster) and increasing partitions during reading did not > seem to help. > > On Fri, Nov 14, 2014 at 6:51 PM, Xiangrui Meng > wrote: > >> Michael is correct. Using direct connection to dump data would be slow >> because there is only a single connection. Please use UNLOAD with ESCAPE >> option to dump the table to S3. See instructions at >> >> http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html >> >> And then load them back using the redshift input format we wrote: >> https://github.com/databricks/spark-redshift (we moved the >> implementation to github/databricks). Right now all columns are loaded as >> string columns, and you need to do type casting manually. We plan to add a >> parser that can translate Redshift table schema directly to Spark SQL >> schema, but no ETA yet. >> >> -Xiangrui >> >> On Nov 14, 2014, at 3:46 PM, Michael Armbrust >> wrote: >> >> I'd guess that its an s3n://key:secret_key@bucket/path from the UNLOAD >> command used to produce the data. Xiangrui can correct me if I'm wrong >> though. >> >> On Fri, Nov 14, 2014 at 2:19 PM, Gary Malouf >> wrote: >> >>> We have a bunch of data in RedShift tables that we'd like to pull in >>> during job runs to Spark. What is the path/url format one uses to pull >>> data from there? (This is in reference to using the >>> https://github.com/mengxr/redshift-input-format) >>> >>> >>> >>> >>> >> >> > > --089e014933e843970e0507dc8307 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I'll try this out and follow up with what I find.

On Fri, Nov 14, = 2014 at 8:54 PM, Xiangrui Meng <meng@databricks.com> wrote= :
For each node, if the CSV reader is implemented efficiently, you should b= e able to hit at least half of the theoretical network bandwidth, which is = about 60MB/second/node. So if you just do counting, the expect time should = be within 3 minutes.

Note that your cluster have 1= 5GB * 12 =3D 180GB RAM in total. If you use the default spark.storage.memor= yFraction, it can barely cache 100GB of data, not considering the overhead.= So if your operation need to cache the data to be efficient, you may need = a larger cluster or change the storage level to MEMORY_AND_DISK.

-Xiangrui

<= div>On Nov 14, 2014, at 5:32 PM, Gary Malouf <malouf.gary@gmail.com> wrote:
=
Hmm, we actually read the CSV data in S3 now and = were looking to avoid that.=C2=A0 Unfortunately, we've experienced drea= dful performance reading 100GB of text data for a job directly from S3 - ou= r hope had been connecting directly to Redshift would provide some boost. = =C2=A0

We had been using 12 m3.xlarges, but increasing d= efault parallelism (to 2x # of cpus across cluster) and increasing partitio= ns during reading did not seem to help.

On Fri, Nov 14, 2014 at 6:51 PM, Xiangrui= Meng <meng@databricks.com> wrote:
Michael is correct. = Using direct connection to dump data would be slow because there is only a = single connection. Please use UNLOAD with ESCAPE option to dump the table t= o S3. See instructions at


=
And then load them back using the redshift input format we wrote:=C2= =A0https://github.com/databricks/spark-redshift=C2=A0(we moved the impl= ementation to github/databricks). Right now all columns are loaded as strin= g columns, and you need to do type casting manually. We plan to add a parse= r that can translate Redshift table schema directly to Spark SQL schema, bu= t no ETA yet.

-Xiang= rui

On= Nov 14, 2014, at 3:46 PM, Michael Armbrust <michael@databricks.com> wrote:
I'd guess that its an s3n://key:secret_ke= y@bucket/path from the UNLOAD command used to produce the data.=C2=A0 X= iangrui can correct me if I'm wrong though.

On Fri, Nov 14, 2014 at 2:19 PM, Gary M= alouf <malouf.gary@gmail.com> wrote:
We have a bunch of data in RedShift tables = that we'd like to pull in during job runs to Spark.=C2=A0 What is the p= ath/url format one uses to pull data from there? =C2=A0(This is in referenc= e to using the=C2=A0https://github.com/mengxr/redshift-input-format)=









--089e014933e843970e0507dc8307--