Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4BD4E9FFD for ; Thu, 16 Feb 2012 19:54:56 +0000 (UTC) Received: (qmail 12690 invoked by uid 500); 16 Feb 2012 19:54:56 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 12664 invoked by uid 500); 16 Feb 2012 19:54:56 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 12657 invoked by uid 99); 16 Feb 2012 19:54:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 19:54:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 19:54:53 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4D1AE23889B8; Thu, 16 Feb 2012 19:54:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1245142 - in /incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch: ingest/WikipediaPartitionedMapper.java output/BufferingRFileRecordWriter.java Date: Thu, 16 Feb 2012 19:54:32 -0000 To: accumulo-commits@incubator.apache.org From: afuchs@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120216195432.4D1AE23889B8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: afuchs Date: Thu Feb 16 19:54:31 2012 New Revision: 1245142 URL: http://svn.apache.org/viewvc?rev=1245142&view=rev Log: ACCUMULO-375 hybridized ingest to use some bulk and some streaming Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java?rev=1245142&r1=1245141&r2=1245142&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java Thu Feb 16 19:54:31 2012 @@ -26,6 +26,9 @@ import java.util.HashSet; import java.util.Map.Entry; import java.util.Set; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; @@ -112,6 +115,7 @@ public class WikipediaPartitionedMapper } } + MultiTableBatchWriter mtbw; @Override public void setup(final Context context) { @@ -121,6 +125,14 @@ public class WikipediaPartitionedMapper reverseIndexTableName = new Text(tablename + "ReverseIndex"); metadataTableName = new Text(tablename + "Metadata"); + try { + mtbw = WikipediaConfiguration.getConnector(conf).createMultiTableBatchWriter(10000000, 1000, 10); + } catch (AccumuloException e) { + throw new RuntimeException(e); + } catch (AccumuloSecurityException e) { + throw new RuntimeException(e); + } + final Text metadataTableNameFinal = metadataTableName; final Text indexTableNameFinal = indexTableName; final Text reverseIndexTableNameFinal = reverseIndexTableName; @@ -163,7 +175,7 @@ public class WikipediaPartitionedMapper Mutation m = new Mutation(key.row); m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); try { - context.write(indexTableNameFinal, m); + mtbw.getBatchWriter(indexTableNameFinal.toString()).addMutation(m); } catch (Exception e) { throw new RuntimeException(e); } @@ -189,7 +201,7 @@ public class WikipediaPartitionedMapper Mutation m = new Mutation(key.row); m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); try { - context.write(reverseIndexTableNameFinal, m); + mtbw.getBatchWriter(reverseIndexTableNameFinal.toString()).addMutation(m); } catch (Exception e) { throw new RuntimeException(e); } @@ -210,7 +222,7 @@ public class WikipediaPartitionedMapper Mutation m = new Mutation(key.row); m.put(key.colfam, key.colqual, key.cv, key.timestamp, value); try { - context.write(metadataTableNameFinal, m); + mtbw.getBatchWriter(metadataTableNameFinal.toString()).addMutation(m); } catch (Exception e) { throw new RuntimeException(e); } Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java?rev=1245142&r1=1245141&r2=1245142&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java Thu Feb 16 19:54:31 2012 @@ -69,8 +69,8 @@ final class BufferingRFileRecordWriter e if (buffer.size() == 0) return; - // TODO fix the filename String file = filenamePrefix + "/" + tablename + "/" + taskID + "_" + (fileCount++) + ".rf"; + // TODO get the table configuration for the given table? FileSKVWriter writer = RFileOperations.getInstance().openWriter(file, fs, conf, acuconf); // forget locality groups for now, just write everything to the default @@ -110,17 +110,18 @@ final class BufferingRFileRecordWriter e { Key k = new Key(mutation.getRow(),update.getColumnFamily(),update.getColumnQualifier(),update.getColumnVisibility(),update.getTimestamp(),update.isDeleted()); Value v = new Value(update.getValue()); + // TODO account for object overhead mutationSize += k.getSize(); mutationSize += v.getSize(); buffer.put(k, v); } size += mutationSize; long bufferSize = bufferSizes.get(table); + + // TODO use a MutableLong instead bufferSize += mutationSize; bufferSizes.put(table, bufferSize); - - // TODO add object overhead size - + while (size >= maxSize) { flushLargestTable(); }