Return-Path: X-Original-To: apmail-accumulo-user-archive@www.apache.org Delivered-To: apmail-accumulo-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2D4DF18713 for ; Wed, 12 Aug 2015 16:03:45 +0000 (UTC) Received: (qmail 85294 invoked by uid 500); 12 Aug 2015 16:03:45 -0000 Delivered-To: apmail-accumulo-user-archive@accumulo.apache.org Received: (qmail 85243 invoked by uid 500); 12 Aug 2015 16:03:44 -0000 Mailing-List: contact user-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@accumulo.apache.org Delivered-To: mailing list user@accumulo.apache.org Received: (qmail 85233 invoked by uid 99); 12 Aug 2015 16:03:44 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Aug 2015 16:03:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 799BC1A9DB2 for ; Wed, 12 Aug 2015 16:03:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.4 X-Spam-Level: ** X-Spam-Status: No, score=2.4 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, RDNS_NONE=2.5] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id tZgFVdHDteh1 for ; Wed, 12 Aug 2015 16:03:28 +0000 (UTC) Received: from spamd4-us-west.apache.org (unknown [209.188.14.142]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTP id D0F9926777 for ; Wed, 12 Aug 2015 16:03:27 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9E42BC4F61 for ; Wed, 12 Aug 2015 16:03:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id zvQjCFXsq7Rs for ; Wed, 12 Aug 2015 16:03:12 +0000 (UTC) Received: from mail-qk0-f180.google.com (mail-qk0-f180.google.com [209.85.220.180]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 2428B42BB4 for ; Wed, 12 Aug 2015 16:03:08 +0000 (UTC) Received: by qkcs67 with SMTP id s67so6666060qkc.1 for ; Wed, 12 Aug 2015 09:03:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=message-id:date:from:user-agent:mime-version:to:subject:references :in-reply-to:content-type:content-transfer-encoding; bh=7VmSuVPwtoUU0nCMoIIPy1Rhl7KflIbt1/TDtbg+gjQ=; b=qQx48blOb03SlSbIIYawRDCe2IGrZaT3RHycXWRm22NLyNobwmtD4CaW5OBEcGTVrl gQQJV8iQQ1g1NnynSwWNdSNszQHaWh+88Vp7twNAHw0G2ZrRqVo3SBtxDwcapjeyfYeY /RTpppc27bVXo0joGYN6L42R9G0/4s0l4rqL93Slt/iI4/DvZjvsSp0F+fcb1asH7EcW nuJyUL5Ipzss+uNSMlYiDOnNXV6oYEyS8U1qCR1FoIj+ZpPrJzMfyC2KM9S+jaKNmlNH SpACplXsM+Pd0BVXby3xKf+gdEeATf0PaPpZlh4FjEQlNDMNrXLYRt4ZHrZCHjUJhsTR WQTQ== X-Received: by 10.55.22.200 with SMTP id 69mr60359492qkw.29.1439395381951; Wed, 12 Aug 2015 09:03:01 -0700 (PDT) Received: from hw10447.local (pool-68-134-10-53.bltmmd.fios.verizon.net. [68.134.10.53]) by smtp.googlemail.com with ESMTPSA id 197sm2469316qhc.49.2015.08.12.09.03.00 for (version=TLSv1 cipher=ECDHE-RSA-RC4-SHA bits=128/128); Wed, 12 Aug 2015 09:03:01 -0700 (PDT) Message-ID: <55CB6E35.5000602@gmail.com> Date: Wed, 12 Aug 2015 12:03:01 -0400 From: Josh Elser User-Agent: Postbox 3.0.11 (Macintosh/20140602) MIME-Version: 1.0 To: user@accumulo.apache.org Subject: Re: map-reduce workflows given index tables References: <55CB5259.4080808@jhu.edu> In-Reply-To: <55CB5259.4080808@jhu.edu> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 7bit Max Thomas wrote: > Suppose you have a workflow like the following (hopefully not too > uncommon): > > * "main table" - billions of rows, each with order of magnitude 100 columns > * Ternary classifier that produces an annotation on each row in the main > table. Suppose the labels are A, B, and C. Additionally, this analytic > adds all labels to an index table, which is of the form: > > label : rowId Can you clarify what you mean by "annotation on each row in the main table"? Another column in the row? > to facilitate lookups of a particular type. > > Now, suppose you want to run another analytic over all rows with label > A, preferably using MapReduce. It seems the options are: > > 1. Create a scanner which retrieves all As from the index table; add > these row IDs to an AccumuloInputFormat job; launch a MapReduce job with > a single map phase. Con: driver program will need a large amount of > memory to hold all rows for the range list. A single node/process doing this all may be a point of contention depending on the number of Accumulo nodes in the system. You'd also need to be tricky about batching things and making sure you don't run out of memory like you outlined. > 2. A MapReduce job over the index table, with a Reduce phase where each > reducer has a collection of row IDs to iterate over. Each reducer then > retrieves its assigned rows and runs over them. This should work and is likely the easiest way to run this type of algorithm over multiple nodes. IMO, this is getting close to the point of using MapReduce as a hammer to drive a screw. It will work, but it's not an ideal solution. > 3. Run over the entire main table with a naive filter to check > classification type. Cons: hits every row, many of which aren't going to > match. > > 4. AccumuloMultiTableFormat, Filters/Iterators - don't seem appropriate > here > > It seems option #2 is ideal, with option #1 possibly working out too. > But, I want to make sure I'm not missing something, as it doesn't seem > possible to set up a workflow where the index table is hit, row IDs are > retrieved, and these are then passed to another MapReduce job capable of > hitting a different table via MapReduce (obviously one could create a > BatchScanner given the inputs anywhere). Are there any examples that > cover this? Or does anyone have a few suggestions about how to set up > such a workflow? I like to think of these in terms of producer/consumer problems. 0) Take label, start a query over index table 1) Produce row IDs from your index table 2) For each consumed row ID, run a query against the main table 3) Collect the rows from the main table query (transform/massage them for presentation) Steps 1 and 2 can be disjoint from one another. As you get more row IDs, you shoot them over to some other process that takes them and runs a query against the main table. Trivially, you could do this via a thread pool, but this will still be limited to a single node's capabilities (which may or may not be sufficient). Scaling beyond that, something like a custom YARN application for scaling and some messaging system (maybe Kafka?) could act as the transport between 1 and 2. It's a lot more work to write than a MapReduce job would be, but you have a lot more control over things and it's bit more elegant IMO. > Another answer might very well be: this is a wacky table/indexing setup, > which I am very amenable to hearing. But to a naive Accumulo user, > having an index table seems OK - I think it is also covered in the manual. Index tables are a very good idea. You should use them :). Accumulo only provides a single sort order. Often, this isn't sufficient for the types of queries you want to support over your data. A secondary index is an easy way to achieve this that works out well given the availability of lots of space on your Hadoop cluster.