Return-Path: X-Original-To: apmail-spark-dev-archive@minotaur.apache.org Delivered-To: apmail-spark-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A85F917829 for ; Wed, 25 Mar 2015 20:45:50 +0000 (UTC) Received: (qmail 65306 invoked by uid 500); 25 Mar 2015 20:45:33 -0000 Delivered-To: apmail-spark-dev-archive@spark.apache.org Received: (qmail 65227 invoked by uid 500); 25 Mar 2015 20:45:33 -0000 Mailing-List: contact dev-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list dev@spark.apache.org Received: (qmail 65211 invoked by uid 99); 25 Mar 2015 20:45:33 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Mar 2015 20:45:33 +0000 X-ASF-Spam-Status: No, hits=1.5 required=10.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of irashid@cloudera.com designates 209.85.212.169 as permitted sender) Received: from [209.85.212.169] (HELO mail-wi0-f169.google.com) (209.85.212.169) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Mar 2015 20:45:08 +0000 Received: by wibgn9 with SMTP id gn9so56750716wib.1 for ; Wed, 25 Mar 2015 13:42:52 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc:content-type; bh=ue48a4Bd45EASmKHWJRXw8bPjatIc4yCPVSkH1ibZkc=; b=XJH2afAJ59LfWM6rxbKwto+bGI9Lh4ykt5azBpBD8K+iHZmE5bvFeUHYlIRiaxm4TZ HyPPsck1W+Lf9BkaSTJ/6TMWbhO3x0eApWkFN9YxKBxkIe0UBv2uliua4nsGngp1yLuM BtcyiVWqjln9DxImVTnVuEAqL41Th15o0TY5LTIk0J9OUsh0I9p7f5SZTTIEfWE6J16Q 8V9N0IhIIhd1SnrufDT8TSUH1NMXimcNrCLHCts8TTNQ6JNG1DG+iKshNG8H3MWuyX7I 7xK34vJBHbTZAIs5iEAfaTwXPK7CngzuEDMD/28SjGAJ+Stvbt1yTFs7AtSTiBw4TrPn AQ4A== X-Gm-Message-State: ALoCoQnJrA7BTwatWG1fh3y+VHNnEc79Rr9J63TjlAz3mTSve74C1MYHW19NhQvcbJO7S35BZBfm X-Received: by 10.194.8.99 with SMTP id q3mr22212563wja.88.1427316172295; Wed, 25 Mar 2015 13:42:52 -0700 (PDT) MIME-Version: 1.0 Received: by 10.27.62.197 with HTTP; Wed, 25 Mar 2015 13:42:32 -0700 (PDT) In-Reply-To: References: From: Imran Rashid Date: Wed, 25 Mar 2015 15:42:32 -0500 Message-ID: Subject: Re: hadoop input/output format advanced control To: Nick Pentreath Cc: "dev@spark.apache.org" Content-Type: multipart/alternative; boundary=047d7b5d65926ea564051222f145 X-Virus-Checked: Checked by ClamAV on apache.org --047d7b5d65926ea564051222f145 Content-Type: text/plain; charset=UTF-8 Hi Nick, I don't remember the exact details of these scenarios, but I think the user wanted a lot more control over how the files got grouped into partitions, to group the files together by some arbitrary function. I didn't think that was possible w/ CombineFileInputFormat, but maybe there is a way? thanks On Tue, Mar 24, 2015 at 1:50 PM, Nick Pentreath wrote: > Imran, on your point to read multiple files together in a partition, is it > not simpler to use the approach of copy Hadoop conf and set per-RDD > settings for min split to control the input size per partition, together > with something like CombineFileInputFormat? > > On Tue, Mar 24, 2015 at 5:28 PM, Imran Rashid > wrote: > > > I think this would be a great addition, I totally agree that you need to > be > > able to set these at a finer context than just the SparkContext. > > > > Just to play devil's advocate, though -- the alternative is for you just > > subclass HadoopRDD yourself, or make a totally new RDD, and then you > could > > expose whatever you need. Why is this solution better? IMO the criteria > > are: > > (a) common operations > > (b) error-prone / difficult to implement > > (c) non-obvious, but important for performance > > > > I think this case fits (a) & (c), so I think its still worthwhile. But > its > > also worth asking whether or not its too difficult for a user to extend > > HadoopRDD right now. There have been several cases in the past week > where > > we've suggested that a user should read from hdfs themselves (eg., to > read > > multiple files together in one partition) -- with*out* reusing the code > in > > HadoopRDD, though they would lose things like the metric tracking & > > preferred locations you get from HadoopRDD. Does HadoopRDD need to some > > refactoring to make that easier to do? Or do we just need a good > example? > > > > Imran > > > > (sorry for hijacking your thread, Koert) > > > > > > > > On Mon, Mar 23, 2015 at 3:52 PM, Koert Kuipers > wrote: > > > > > see email below. reynold suggested i send it to dev instead of user > > > > > > ---------- Forwarded message ---------- > > > From: Koert Kuipers > > > Date: Mon, Mar 23, 2015 at 4:36 PM > > > Subject: hadoop input/output format advanced control > > > To: "user@spark.apache.org" > > > > > > > > > currently its pretty hard to control the Hadoop Input/Output formats > used > > > in Spark. The conventions seems to be to add extra parameters to all > > > methods and then somewhere deep inside the code (for example in > > > PairRDDFunctions.saveAsHadoopFile) all these parameters get translated > > into > > > settings on the Hadoop Configuration object. > > > > > > for example for compression i see "codec: Option[Class[_ <: > > > CompressionCodec]] = None" added to a bunch of methods. > > > > > > how scalable is this solution really? > > > > > > for example i need to read from a hadoop dataset and i dont want the > > input > > > (part) files to get split up. the way to do this is to set > > > "mapred.min.split.size". now i dont want to set this at the level of > the > > > SparkContext (which can be done), since i dont want it to apply to > input > > > formats in general. i want it to apply to just this one specific input > > > dataset i need to read. which leaves me with no options currently. i > > could > > > go add yet another input parameter to all the methods > > > (SparkContext.textFile, SparkContext.hadoopFile, > SparkContext.objectFile, > > > etc.). but that seems ineffective. > > > > > > why can we not expose a Map[String, String] or some other generic way > to > > > manipulate settings for hadoop input/output formats? it would require > > > adding one more parameter to all methods to deal with hadoop > input/output > > > formats, but after that its done. one parameter to rule them all.... > > > > > > then i could do: > > > val x = sc.textFile("/some/path", formatSettings = > > > Map("mapred.min.split.size" -> "12345")) > > > > > > or > > > rdd.saveAsTextFile("/some/path, formatSettings = > > > Map(mapred.output.compress" -> "true", > "mapred.output.compression.codec" > > -> > > > "somecodec")) > > > > > > --047d7b5d65926ea564051222f145--