Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3CE419232 for ; Tue, 12 Jun 2012 20:21:35 +0000 (UTC) Received: (qmail 83206 invoked by uid 500); 12 Jun 2012 20:21:35 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 83167 invoked by uid 500); 12 Jun 2012 20:21:34 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 83158 invoked by uid 99); 12 Jun 2012 20:21:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Jun 2012 20:21:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Jun 2012 20:21:32 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 612D22388980 for ; Tue, 12 Jun 2012 20:21:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1349503 - in /accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis: ./ FilterMeta.java FindTablet.java IndexMeta.java LogFileInputFormat.java LogFileOutputFormat.java PrintEvents.java package-info.java Date: Tue, 12 Jun 2012 20:21:12 -0000 To: commits@accumulo.apache.org From: kturner@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120612202112.612D22388980@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kturner Date: Tue Jun 12 20:21:11 2012 New Revision: 1349503 URL: http://svn.apache.org/viewvc?rev=1349503&view=rev Log: ACCUMULO-549 initial checkin of a set of utilities for indexing and analyzing metadata table mutations found in write ahead logs Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java?rev=1349503&view=auto ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java (added) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java Tue Jun 12 20:21:11 2012 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.metanalysis; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.logger.LogEvents; +import org.apache.accumulo.server.logger.LogFileKey; +import org.apache.accumulo.server.logger.LogFileValue; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A map reduce job that takes a set of walogs and filters out all non metadata table events. + */ +public class FilterMeta extends Configured implements Tool { + + public static class FilterMapper extends Mapper { + private Set tabletIds; + + @Override + protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { + tabletIds = new HashSet(); + } + + @Override + public void map(LogFileKey key, LogFileValue value, Context context) throws IOException, InterruptedException { + if (key.event == LogEvents.OPEN) { + context.write(key, value); + } else if (key.event == LogEvents.DEFINE_TABLET && key.tablet.getTableId().toString().equals(Constants.METADATA_TABLE_ID)) { + tabletIds.add(key.tid); + context.write(key, value); + } else if ((key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) && tabletIds.contains(key.tid)) { + context.write(key, value); + } + } + } + + @Override + public int run(String[] args) throws Exception { + + String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis(); + + Job job = new Job(getConf(), jobName); + job.setJarByClass(this.getClass()); + + Path paths[] = new Path[args.length - 1]; + for (int i = 0; i < paths.length; i++) { + paths[i] = new Path(args[i]); + } + + job.setInputFormatClass(LogFileInputFormat.class); + LogFileInputFormat.setInputPaths(job, paths); + + job.setOutputFormatClass(LogFileOutputFormat.class); + LogFileOutputFormat.setOutputPath(job, new Path(args[args.length - 1])); + + job.setMapperClass(FilterMapper.class); + + job.setNumReduceTasks(0); + + job.waitForCompletion(true); + return job.isSuccessful() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(CachedConfiguration.getInstance(), new FilterMeta(), args); + System.exit(res); + } +} Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java?rev=1349503&view=auto ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java (added) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java Tue Jun 12 20:21:11 2012 @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.metanalysis; + +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.io.Text; + +/** + * Finds tablet creation events. + */ +public class FindTablet { + public static void main(String[] args) throws Exception { + + Options options = new Options(); + options.addOption("r", "row", true, "find tablets that contain this row"); + + GnuParser parser = new GnuParser(); + CommandLine cmd = null; + try { + cmd = parser.parse(options, args); + if (cmd.getArgs().length != 5) { + throw new ParseException("Command takes no arguments"); + } + } catch (ParseException e) { + System.err.println("Failed to parse command line " + e.getMessage()); + System.err.println(); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(FindTablet.class.getSimpleName() + " ", options); + System.exit(-1); + } + + String instance = cmd.getArgs()[0]; + String zookeepers = cmd.getArgs()[1]; + String user = cmd.getArgs()[2]; + String pass = cmd.getArgs()[3]; + String tableID = cmd.getArgs()[4]; + + ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers); + Connector conn = zki.getConnector(user, pass); + + if (cmd.hasOption('r')) { + findContainingTablets(conn, tableID, cmd.getOptionValue('r')); + } else { + System.err.println("ERROR : No search criteria given"); + } + } + + /** + * @param conn + * @param tablePrefix + * @param tableID + * @param option + */ + private static void findContainingTablets(Connector conn, String tableID, String row) throws Exception { + Range range = new KeyExtent(new Text(tableID), null, null).toMetadataRange(); + + Scanner scanner = conn.createScanner("createEvents", new Authorizations()); + + scanner.setRange(range); + + for (Entry entry : scanner) { + KeyExtent ke = new KeyExtent(entry.getKey().getRow(), new Value(TextUtil.getBytes(entry.getKey().getColumnFamily()))); + if (ke.contains(new Text(row))) { + System.out.println(entry.getKey().getColumnQualifier() + " " + ke + " " + entry.getValue()); + } + } + } +} Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java?rev=1349503&view=auto ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java (added) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java Tue Jun 12 20:21:11 2012 @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.metanalysis; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.logger.LogEvents; +import org.apache.accumulo.server.logger.LogFileKey; +import org.apache.accumulo.server.logger.LogFileValue; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; + +/** + * A map reduce job that takes write ahead logs containing mutations for the metadata table and indexes them into Accumulo tables for analysis. + * + */ + +public class IndexMeta extends Configured implements Tool { + + public static class IndexMapper extends Mapper { + private static final Text CREATE_EVENTS_TABLE = new Text("createEvents"); + private static final Text TABLET_EVENTS_TABLE = new Text("tabletEvents"); + private Map tabletIds = new HashMap(); + private String uuid = null; + + @Override + protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { + tabletIds = new HashMap(); + uuid = null; + } + + @Override + public void map(LogFileKey key, LogFileValue value, Context context) throws IOException, InterruptedException { + if (key.event == LogEvents.OPEN) { + uuid = key.tserverSession; + } else if (key.event == LogEvents.DEFINE_TABLET) { + if (key.tablet.getTableId().toString().equals(Constants.METADATA_TABLE_ID)) { + tabletIds.put(key.tid, new KeyExtent(key.tablet)); + } + } else if ((key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) && tabletIds.containsKey(key.tid)) { + for (Mutation m : value.mutations) { + index(context, m, uuid, tabletIds.get(key.tid)); + } + } + } + + void index(Context context, Mutation m, String logFile, KeyExtent metaTablet) throws IOException, InterruptedException { + List columnsUpdates = m.getUpdates(); + + Text prevRow = null; + long timestamp = 0; + + if (m.getRow().length > 0 && m.getRow()[0] == '~') { + return; + } + + for (ColumnUpdate cu : columnsUpdates) { + if (Constants.METADATA_PREV_ROW_COLUMN.equals(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier())) && !cu.isDeleted()) { + prevRow = new Text(cu.getValue()); + } + + timestamp = cu.getTimestamp(); + } + + byte[] serMut = WritableUtils.toByteArray(m); + + if (prevRow != null) { + Mutation createEvent = new Mutation(new Text(m.getRow())); + createEvent.put(prevRow, new Text(String.format("%020d", timestamp)), new Value(metaTablet.toString().getBytes())); + context.write(CREATE_EVENTS_TABLE, createEvent); + } + + Mutation tabletEvent = new Mutation(new Text(m.getRow())); + tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mut"), new Value(serMut)); + tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mtab"), new Value(metaTablet.toString().getBytes())); + tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("log"), new Value(logFile.getBytes())); + context.write(TABLET_EVENTS_TABLE, tabletEvent); + } + } + + + + @Override + public int run(String[] args) throws Exception { + if (args.length < 5) { + System.err.println("Usage : " + IndexMeta.class + " {}"); + return -1; + } + + String instance = args[0]; + String zookeepers = args[1]; + String user = args[2]; + String pass = args[3]; + + String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis(); + + Job job = new Job(getConf(), jobName); + job.setJarByClass(this.getClass()); + + List logFiles = Arrays.asList(args).subList(4, args.length); + Path paths[] = new Path[logFiles.size()]; + int count = 0; + for (String logFile : logFiles) { + paths[count++] = new Path(logFile); + } + + job.setInputFormatClass(LogFileInputFormat.class); + LogFileInputFormat.setInputPaths(job, paths); + + job.setNumReduceTasks(0); + + job.setOutputFormatClass(AccumuloOutputFormat.class); + AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers); + AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), user, pass.getBytes(), false, null); + + job.setMapperClass(IndexMapper.class); + + ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers); + Connector conn = zki.getConnector(user, pass); + + try { + conn.tableOperations().create("createEvents"); + } catch (TableExistsException tee) { + Logger.getLogger(IndexMeta.class).warn("Table createEvents exists"); + } + + try { + conn.tableOperations().create("tabletEvents"); + } catch (TableExistsException tee) { + Logger.getLogger(IndexMeta.class).warn("Table tabletEvents exists"); + } + + job.waitForCompletion(true); + return job.isSuccessful() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(CachedConfiguration.getInstance(), new IndexMeta(), args); + System.exit(res); + } +} Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java?rev=1349503&view=auto ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java (added) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java Tue Jun 12 20:21:11 2012 @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.server.metanalysis; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.accumulo.server.logger.LogFileKey; +import org.apache.accumulo.server.logger.LogFileValue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * Input format for Accumulo write ahead logs + */ +public class LogFileInputFormat extends FileInputFormat { + + private static class LogFileRecordReader extends RecordReader { + + private FSDataInputStream fsdis; + private LogFileKey key; + private LogFileValue value; + private long length; + + @Override + public void close() throws IOException { + fsdis.close(); + } + + @Override + public LogFileKey getCurrentKey() throws IOException, InterruptedException { + return key; + } + + @Override + public LogFileValue getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + float progress = (length - fsdis.getPos()) / (float) length; + if (progress < 0) + return 0; + return progress; + } + + @Override + public void initialize(InputSplit is, TaskAttemptContext context) throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) is; + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + + key = new LogFileKey(); + value = new LogFileValue(); + + fsdis = fs.open(fileSplit.getPath()); + FileStatus status = fs.getFileStatus(fileSplit.getPath()); + length = status.getLen(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (key == null) + return false; + + try { + key.readFields(fsdis); + value.readFields(fsdis); + return true; + } catch (EOFException ex) { + key = null; + value = null; + return false; + } + } + + } + + + @Override + public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { + return new LogFileRecordReader(); + } + + @Override + protected boolean isSplitable(JobContext context, Path filename) { + return false; + } + +} Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java?rev=1349503&view=auto ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java (added) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java Tue Jun 12 20:21:11 2012 @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.metanalysis; + +import java.io.IOException; + +import org.apache.accumulo.server.logger.LogFileKey; +import org.apache.accumulo.server.logger.LogFileValue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** + * Output format for Accumulo write ahead logs. + */ +public class LogFileOutputFormat extends FileOutputFormat { + + private static class LogFileRecordWriter extends RecordWriter { + + private FSDataOutputStream out; + + /** + * @param outputPath + * @throws IOException + */ + public LogFileRecordWriter(Path outputPath) throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + + out = fs.create(outputPath); + } + + @Override + public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { + out.close(); + } + + @Override + public void write(LogFileKey key, LogFileValue val) throws IOException, InterruptedException { + key.write(out); + val.write(out); + } + + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + Path outputPath = getDefaultWorkFile(context, ""); + return new LogFileRecordWriter(outputPath); + } + +} Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java?rev=1349503&view=auto ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java (added) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java Tue Jun 12 20:21:11 2012 @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.metanalysis; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.server.logger.LogFileValue; +import org.apache.hadoop.io.Text; + +/** + * Looks up and prints mutations indexed by IndexMeta + */ +public class PrintEvents { + + /** + * @param args + */ + public static void main(String[] args) throws Exception { + if (args.length != 7) { + System.err.println("Usage : " + IndexMeta.class + "