accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [49/53] [abbrv] ACCUMULO-658 Move tests and resources to correct modules
Date Fri, 06 Sep 2013 01:49:17 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java
deleted file mode 100644
index f190cee..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.constraints;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.constraints.Constraint;
-import org.apache.accumulo.core.data.ColumnUpdate;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
-import org.apache.accumulo.core.util.ColumnFQ;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
-import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-public class MetadataConstraints implements Constraint {
-  
-  private ZooCache zooCache = null;
-  private String zooRoot = null;
-  
-  private static final Logger log = Logger.getLogger(MetadataConstraints.class);
-  
-  private static boolean[] validTableNameChars = new boolean[256];
-  
-  {
-    for (int i = 0; i < 256; i++) {
-      validTableNameChars[i] = ((i >= 'a' && i <= 'z') || (i >= '0' && i <= '9')) || i == '!';
-    }
-  }
-  
-  private static final HashSet<ColumnFQ> validColumnQuals = new HashSet<ColumnFQ>(Arrays.asList(new ColumnFQ[] {
-      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN,
-      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN,
-      TabletsSection.ServerColumnFamily.LOCK_COLUMN, TabletsSection.ServerColumnFamily.FLUSH_COLUMN, TabletsSection.ServerColumnFamily.COMPACT_COLUMN}));
-  
-  private static final HashSet<Text> validColumnFams = new HashSet<Text>(Arrays.asList(new Text[] {TabletsSection.BulkFileColumnFamily.NAME,
-      LogColumnFamily.NAME, ScanFileColumnFamily.NAME, DataFileColumnFamily.NAME,
-      TabletsSection.CurrentLocationColumnFamily.NAME, TabletsSection.LastLocationColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME,
-      ChoppedColumnFamily.NAME, ClonedColumnFamily.NAME}));
-  
-  private static boolean isValidColumn(ColumnUpdate cu) {
-    
-    if (validColumnFams.contains(new Text(cu.getColumnFamily())))
-      return true;
-    
-    if (validColumnQuals.contains(new ColumnFQ(cu)))
-      return true;
-    
-    return false;
-  }
-  
-  static private ArrayList<Short> addViolation(ArrayList<Short> lst, int violation) {
-    if (lst == null)
-      lst = new ArrayList<Short>();
-    lst.add((short) violation);
-    return lst;
-  }
-  
-  static private ArrayList<Short> addIfNotPresent(ArrayList<Short> lst, int intViolation) {
-    if (lst == null)
-      return addViolation(lst, intViolation);
-    short violation = (short) intViolation;
-    if (!lst.contains(violation))
-      return addViolation(lst, intViolation);
-    return lst;
-  }
-  
-  @Override
-  public List<Short> check(Environment env, Mutation mutation) {
-    
-    ArrayList<Short> violations = null;
-    
-    Collection<ColumnUpdate> colUpdates = mutation.getUpdates();
-    
-    // check the row, it should contains at least one ; or end with <
-    boolean containsSemiC = false;
-    
-    byte[] row = mutation.getRow();
-    
-    // always allow rows that fall within reserved areas
-    if (row.length > 0 && row[0] == '~')
-      return null;
-    if (row.length > 2 && row[0] == '!' && row[1] == '!' && row[2] == '~')
-      return null;
-    
-    for (byte b : row) {
-      if (b == ';') {
-        containsSemiC = true;
-      }
-      
-      if (b == ';' || b == '<')
-        break;
-      
-      if (!validTableNameChars[0xff & b]) {
-        violations = addIfNotPresent(violations, 4);
-      }
-    }
-    
-    if (!containsSemiC) {
-      // see if last row char is <
-      if (row.length == 0 || row[row.length - 1] != '<') {
-        violations = addIfNotPresent(violations, 4);
-      }
-    } else {
-      if (row.length == 0) {
-        violations = addIfNotPresent(violations, 4);
-      }
-    }
-    
-    if (row.length > 0 && row[0] == '!') {
-      if (row.length < 3 || row[1] != '0' || (row[2] != '<' && row[2] != ';')) {
-        violations = addIfNotPresent(violations, 4);
-      }
-    }
-    
-    // ensure row is not less than Constants.METADATA_TABLE_ID
-    if (new Text(row).compareTo(new Text(MetadataTable.ID)) < 0) {
-      violations = addViolation(violations, 5);
-    }
-    
-    boolean checkedBulk = false;
-    
-    for (ColumnUpdate columnUpdate : colUpdates) {
-      Text columnFamily = new Text(columnUpdate.getColumnFamily());
-      
-      if (columnUpdate.isDeleted()) {
-        if (!isValidColumn(columnUpdate)) {
-          violations = addViolation(violations, 2);
-        }
-        continue;
-      }
-      
-      if (columnUpdate.getValue().length == 0 && !columnFamily.equals(ScanFileColumnFamily.NAME)) {
-        violations = addViolation(violations, 6);
-      }
-      
-      if (columnFamily.equals(DataFileColumnFamily.NAME)) {
-        try {
-          DataFileValue dfv = new DataFileValue(columnUpdate.getValue());
-          
-          if (dfv.getSize() < 0 || dfv.getNumEntries() < 0) {
-            violations = addViolation(violations, 1);
-          }
-        } catch (NumberFormatException nfe) {
-          violations = addViolation(violations, 1);
-        } catch (ArrayIndexOutOfBoundsException aiooe) {
-          violations = addViolation(violations, 1);
-        }
-      } else if (columnFamily.equals(ScanFileColumnFamily.NAME)) {
-        
-      } else if (columnFamily.equals(TabletsSection.BulkFileColumnFamily.NAME)) {
-        if (!columnUpdate.isDeleted() && !checkedBulk) {
-          // splits, which also write the time reference, are allowed to write this reference even when
-          // the transaction is not running because the other half of the tablet is holding a reference
-          // to the file.
-          boolean isSplitMutation = false;
-          // When a tablet is assigned, it re-writes the metadata. It should probably only update the location information,
-          // but it writes everything. We allow it to re-write the bulk information if it is setting the location.
-          // See ACCUMULO-1230.
-          boolean isLocationMutation = false;
-          
-          HashSet<Text> dataFiles = new HashSet<Text>();
-          HashSet<Text> loadedFiles = new HashSet<Text>();
-          
-          String tidString = new String(columnUpdate.getValue());
-          int otherTidCount = 0;
-          
-          for (ColumnUpdate update : mutation.getUpdates()) {
-            if (new ColumnFQ(update).equals(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN)) {
-              isSplitMutation = true;
-            } else if (new Text(update.getColumnFamily()).equals(TabletsSection.CurrentLocationColumnFamily.NAME)) {
-              isLocationMutation = true;
-            } else if (new Text(update.getColumnFamily()).equals(DataFileColumnFamily.NAME)) {
-              dataFiles.add(new Text(update.getColumnQualifier()));
-            } else if (new Text(update.getColumnFamily()).equals(TabletsSection.BulkFileColumnFamily.NAME)) {
-              loadedFiles.add(new Text(update.getColumnQualifier()));
-              
-              if (!new String(update.getValue()).equals(tidString)) {
-                otherTidCount++;
-              }
-            }
-          }
-          
-          if (!isSplitMutation && !isLocationMutation) {
-            long tid = Long.parseLong(tidString);
-            
-            try {
-              if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
-                violations = addViolation(violations, 8);
-              }
-            } catch (Exception ex) {
-              violations = addViolation(violations, 8);
-            }
-          }
-          
-          checkedBulk = true;
-        }
-      } else {
-        if (!isValidColumn(columnUpdate)) {
-          violations = addViolation(violations, 2);
-        } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN) && columnUpdate.getValue().length > 0
-            && (violations == null || !violations.contains((short) 4))) {
-          KeyExtent ke = new KeyExtent(new Text(mutation.getRow()), (Text) null);
-          
-          Text per = KeyExtent.decodePrevEndRow(new Value(columnUpdate.getValue()));
-          
-          boolean prevEndRowLessThanEndRow = per == null || ke.getEndRow() == null || per.compareTo(ke.getEndRow()) < 0;
-          
-          if (!prevEndRowLessThanEndRow) {
-            violations = addViolation(violations, 3);
-          }
-        } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.ServerColumnFamily.LOCK_COLUMN)) {
-          if (zooCache == null) {
-            zooCache = new ZooCache();
-          }
-          
-          if (zooRoot == null) {
-            zooRoot = ZooUtil.getRoot(HdfsZooInstance.getInstance());
-          }
-          
-          boolean lockHeld = false;
-          String lockId = new String(columnUpdate.getValue());
-          
-          try {
-            lockHeld = ZooLock.isLockHeld(zooCache, new ZooUtil.LockID(zooRoot, lockId));
-          } catch (Exception e) {
-            log.debug("Failed to verify lock was held " + lockId + " " + e.getMessage());
-          }
-          
-          if (!lockHeld) {
-            violations = addViolation(violations, 7);
-          }
-        }
-        
-      }
-    }
-    
-    if (violations != null) {
-      log.debug("violating metadata mutation : " + new String(mutation.getRow()));
-      for (ColumnUpdate update : mutation.getUpdates()) {
-        log.debug(" update: " + new String(update.getColumnFamily()) + ":" + new String(update.getColumnQualifier()) + " value "
-            + (update.isDeleted() ? "[delete]" : new String(update.getValue())));
-      }
-    }
-    
-    return violations;
-  }
-  
-  protected Arbitrator getArbitrator() {
-    return new ZooArbitrator();
-  }
-  
-  @Override
-  public String getViolationDescription(short violationCode) {
-    switch (violationCode) {
-      case 1:
-        return "data file size must be a non-negative integer";
-      case 2:
-        return "Invalid column name given.";
-      case 3:
-        return "Prev end row is greater than or equal to end row.";
-      case 4:
-        return "Invalid metadata row format";
-      case 5:
-        return "Row can not be less than " + MetadataTable.ID;
-      case 6:
-        return "Empty values are not allowed for any " + MetadataTable.NAME + " column";
-      case 7:
-        return "Lock not held in zookeeper by writer";
-      case 8:
-        return "Bulk load transaction no longer running";
-    }
-    return null;
-  }
-  
-  @Override
-  protected void finalize() {
-    if (zooCache != null)
-      zooCache.clear();
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/tserver/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java
deleted file mode 100644
index 8c4c4e2..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.iterators;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.Filter;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
-import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
-import org.apache.log4j.Logger;
-
-/**
- * A special iterator for the metadata table that removes inactive bulk load flags
- * 
- */
-public class MetadataBulkLoadFilter extends Filter {
-  private static Logger log = Logger.getLogger(MetadataBulkLoadFilter.class);
-  
-  enum Status {
-    ACTIVE, INACTIVE
-  }
-  
-  Map<Long,Status> bulkTxStatusCache;
-  Arbitrator arbitrator;
-  
-  @Override
-  public boolean accept(Key k, Value v) {
-    if (!k.isDeleted() && k.compareColumnFamily(TabletsSection.BulkFileColumnFamily.NAME) == 0) {
-      long txid = Long.valueOf(v.toString());
-      
-      Status status = bulkTxStatusCache.get(txid);
-      if (status == null) {
-        try {
-          if (arbitrator.transactionComplete(Constants.BULK_ARBITRATOR_TYPE, txid)) {
-            status = Status.INACTIVE;
-          } else {
-            status = Status.ACTIVE;
-          }
-        } catch (Exception e) {
-          status = Status.ACTIVE;
-          log.error(e, e);
-        }
-        
-        bulkTxStatusCache.put(txid, status);
-      }
-      
-      return status == Status.ACTIVE;
-    }
-    
-    return true;
-  }
-  
-  @Override
-  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
-    super.init(source, options, env);
-    
-    if (env.getIteratorScope() == IteratorScope.scan) {
-      throw new IOException("This iterator not intended for use at scan time");
-    }
-    
-    bulkTxStatusCache = new HashMap<Long,MetadataBulkLoadFilter.Status>();
-    arbitrator = getArbitrator();
-  }
-  
-  protected Arbitrator getArbitrator() {
-    return new ZooArbitrator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/CheckTabletMetadataTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/CheckTabletMetadataTest.java b/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/CheckTabletMetadataTest.java
new file mode 100644
index 0000000..aa7e7a4
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/CheckTabletMetadataTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.master.state.TServerInstance;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CheckTabletMetadataTest {
+  
+  private static Key nk(String row, ColumnFQ cfq) {
+    return new Key(new Text(row), cfq.getColumnFamily(), cfq.getColumnQualifier());
+  }
+  
+  private static Key nk(String row, Text cf, String cq) {
+    return new Key(row, cf.toString(), cq);
+  }
+  
+  private static void put(TreeMap<Key,Value> tabletMeta, String row, ColumnFQ cfq, byte[] val) {
+    Key k = new Key(new Text(row), cfq.getColumnFamily(), cfq.getColumnQualifier());
+    tabletMeta.put(k, new Value(val));
+  }
+  
+  private static void put(TreeMap<Key,Value> tabletMeta, String row, Text cf, String cq, String val) {
+    Key k = new Key(new Text(row), cf, new Text(cq));
+    tabletMeta.put(k, new Value(val.getBytes()));
+  }
+  
+  private static void assertFail(TreeMap<Key,Value> tabletMeta, KeyExtent ke, TServerInstance tsi) {
+    try {
+      Assert.assertNull(TabletServer.checkTabletMetadata(ke, tsi, tabletMeta, ke.getMetadataEntry()));
+    } catch (Exception e) {
+      
+    }
+  }
+  
+  private static void assertFail(TreeMap<Key,Value> tabletMeta, KeyExtent ke, TServerInstance tsi, Key keyToDelete) {
+    TreeMap<Key,Value> copy = new TreeMap<Key,Value>(tabletMeta);
+    Assert.assertNotNull(copy.remove(keyToDelete));
+    try {
+      Assert.assertNull(TabletServer.checkTabletMetadata(ke, tsi, copy, ke.getMetadataEntry()));
+    } catch (Exception e) {
+      
+    }
+  }
+  
+  @Test
+  public void testBadTabletMetadata() throws Exception {
+    
+    KeyExtent ke = new KeyExtent(new Text("1"), null, null);
+    
+    TreeMap<Key,Value> tabletMeta = new TreeMap<Key,Value>();
+    
+    put(tabletMeta, "1<", TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, KeyExtent.encodePrevEndRow(null).get());
+    put(tabletMeta, "1<", TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, "/t1".getBytes());
+    put(tabletMeta, "1<", TabletsSection.ServerColumnFamily.TIME_COLUMN, "M0".getBytes());
+    put(tabletMeta, "1<", TabletsSection.FutureLocationColumnFamily.NAME, "4", "127.0.0.1:9997");
+    
+    TServerInstance tsi = new TServerInstance("127.0.0.1:9997", 4);
+    
+    Assert.assertNotNull(TabletServer.checkTabletMetadata(ke, tsi, tabletMeta, ke.getMetadataEntry()));
+    
+    assertFail(tabletMeta, ke, new TServerInstance("127.0.0.1:9998", 4));
+    assertFail(tabletMeta, ke, new TServerInstance("127.0.0.1:9998", 5));
+    assertFail(tabletMeta, ke, new TServerInstance("127.0.0.1:9997", 5));
+    assertFail(tabletMeta, ke, new TServerInstance("127.0.0.2:9997", 4));
+    assertFail(tabletMeta, ke, new TServerInstance("127.0.0.2:9997", 5));
+    
+    assertFail(tabletMeta, new KeyExtent(new Text("1"), null, new Text("m")), tsi);
+    
+    assertFail(tabletMeta, new KeyExtent(new Text("1"), new Text("r"), new Text("m")), tsi);
+    
+    assertFail(tabletMeta, ke, tsi, nk("1<", TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN));
+    
+    assertFail(tabletMeta, ke, tsi, nk("1<", TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN));
+    
+    assertFail(tabletMeta, ke, tsi, nk("1<", TabletsSection.ServerColumnFamily.TIME_COLUMN));
+    
+    assertFail(tabletMeta, ke, tsi, nk("1<", TabletsSection.FutureLocationColumnFamily.NAME, "4"));
+    
+    TreeMap<Key,Value> copy = new TreeMap<Key,Value>(tabletMeta);
+    put(copy, "1<", TabletsSection.CurrentLocationColumnFamily.NAME, "4", "127.0.0.1:9997");
+    assertFail(copy, ke, tsi);
+    assertFail(copy, ke, tsi, nk("1<", TabletsSection.FutureLocationColumnFamily.NAME, "4"));
+    
+    copy = new TreeMap<Key,Value>(tabletMeta);
+    put(copy, "1<", TabletsSection.CurrentLocationColumnFamily.NAME, "5", "127.0.0.1:9998");
+    assertFail(copy, ke, tsi);
+    put(copy, "1<", TabletsSection.CurrentLocationColumnFamily.NAME, "6", "127.0.0.1:9999");
+    assertFail(copy, ke, tsi);
+    
+    copy = new TreeMap<Key,Value>(tabletMeta);
+    put(copy, "1<", TabletsSection.FutureLocationColumnFamily.NAME, "5", "127.0.0.1:9998");
+    assertFail(copy, ke, tsi);
+    
+    assertFail(new TreeMap<Key,Value>(), ke, tsi);
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java b/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
new file mode 100644
index 0000000..fd5e661
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ZooConfiguration;
+import org.apache.accumulo.server.tabletserver.InMemoryMap.MemoryIterator;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+
+public class InMemoryMapTest extends TestCase {
+  
+  @Before
+  public void setUp() throws Exception {
+    // suppress log messages having to do with not having an instance
+    Logger.getLogger(ZooConfiguration.class).setLevel(Level.OFF);
+    Logger.getLogger(HdfsZooInstance.class).setLevel(Level.OFF);
+  }
+  
+  public void mutate(InMemoryMap imm, String row, String column, long ts) {
+    Mutation m = new Mutation(new Text(row));
+    String[] sa = column.split(":");
+    m.putDelete(new Text(sa[0]), new Text(sa[1]), ts);
+    
+    imm.mutate(Collections.singletonList(m));
+  }
+  
+  public void mutate(InMemoryMap imm, String row, String column, long ts, String value) {
+    Mutation m = new Mutation(new Text(row));
+    String[] sa = column.split(":");
+    m.put(new Text(sa[0]), new Text(sa[1]), ts, new Value(value.getBytes()));
+    
+    imm.mutate(Collections.singletonList(m));
+  }
+  
+  static Key nk(String row, String column, long ts) {
+    String[] sa = column.split(":");
+    Key k = new Key(new Text(row), new Text(sa[0]), new Text(sa[1]), ts);
+    return k;
+  }
+  
+  static void ae(SortedKeyValueIterator<Key,Value> dc, String row, String column, int ts, String val) throws IOException {
+    assertTrue(dc.hasTop());
+    assertEquals(nk(row, column, ts), dc.getTopKey());
+    assertEquals(new Value(val.getBytes()), dc.getTopValue());
+    dc.next();
+    
+  }
+  
+  static Set<ByteSequence> newCFSet(String... cfs) {
+    HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>();
+    for (String cf : cfs) {
+      cfSet.add(new ArrayByteSequence(cf));
+    }
+    return cfSet;
+  }
+
+  public void test2() throws Exception {
+    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    
+    MemoryIterator ski1 = imm.skvIterator();
+    mutate(imm, "r1", "foo:cq1", 3, "bar1");
+    MemoryIterator ski2 = imm.skvIterator();
+    
+    ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+    assertFalse(ski1.hasTop());
+    
+    ski2.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+    assertTrue(ski2.hasTop());
+    ae(ski2, "r1", "foo:cq1", 3, "bar1");
+    assertFalse(ski2.hasTop());
+    
+  }
+  
+  public void test3() throws Exception {
+    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    
+    mutate(imm, "r1", "foo:cq1", 3, "bar1");
+    mutate(imm, "r1", "foo:cq1", 3, "bar2");
+    MemoryIterator ski1 = imm.skvIterator();
+    mutate(imm, "r1", "foo:cq1", 3, "bar3");
+    
+    mutate(imm, "r3", "foo:cq1", 3, "bar9");
+    mutate(imm, "r3", "foo:cq1", 3, "bara");
+    
+    MemoryIterator ski2 = imm.skvIterator();
+    
+    ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(ski1, "r1", "foo:cq1", 3, "bar2");
+    ae(ski1, "r1", "foo:cq1", 3, "bar1");
+    assertFalse(ski1.hasTop());
+    
+    ski2.seek(new Range(new Text("r3")), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(ski2, "r3", "foo:cq1", 3, "bara");
+    ae(ski2, "r3", "foo:cq1", 3, "bar9");
+    assertFalse(ski1.hasTop());
+    
+  }
+  
+  public void test4() throws Exception {
+    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    
+    mutate(imm, "r1", "foo:cq1", 3, "bar1");
+    mutate(imm, "r1", "foo:cq1", 3, "bar2");
+    MemoryIterator ski1 = imm.skvIterator();
+    mutate(imm, "r1", "foo:cq1", 3, "bar3");
+    
+    imm.delete(0);
+    
+    ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(ski1, "r1", "foo:cq1", 3, "bar2");
+    ae(ski1, "r1", "foo:cq1", 3, "bar1");
+    assertFalse(ski1.hasTop());
+    
+    ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(ski1, "r1", "foo:cq1", 3, "bar2");
+    ae(ski1, "r1", "foo:cq1", 3, "bar1");
+    assertFalse(ski1.hasTop());
+    
+    ski1.seek(new Range(new Text("r2")), LocalityGroupUtil.EMPTY_CF_SET, false);
+    assertFalse(ski1.hasTop());
+    
+    ski1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(ski1, "r1", "foo:cq1", 3, "bar2");
+    ae(ski1, "r1", "foo:cq1", 3, "bar1");
+    assertFalse(ski1.hasTop());
+    
+    ski1.close();
+  }
+  
+  public void test5() throws Exception {
+    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    
+    mutate(imm, "r1", "foo:cq1", 3, "bar1");
+    mutate(imm, "r1", "foo:cq1", 3, "bar2");
+    mutate(imm, "r1", "foo:cq1", 3, "bar3");
+    
+    MemoryIterator ski1 = imm.skvIterator();
+    ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(ski1, "r1", "foo:cq1", 3, "bar3");
+    
+    imm.delete(0);
+    
+    ae(ski1, "r1", "foo:cq1", 3, "bar2");
+    ae(ski1, "r1", "foo:cq1", 3, "bar1");
+    assertFalse(ski1.hasTop());
+    
+    ski1.close();
+    
+    imm = new InMemoryMap(false, "/tmp");
+    
+    mutate(imm, "r1", "foo:cq1", 3, "bar1");
+    mutate(imm, "r1", "foo:cq2", 3, "bar2");
+    mutate(imm, "r1", "foo:cq3", 3, "bar3");
+    
+    ski1 = imm.skvIterator();
+    ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(ski1, "r1", "foo:cq1", 3, "bar1");
+    
+    imm.delete(0);
+    
+    ae(ski1, "r1", "foo:cq2", 3, "bar2");
+    ae(ski1, "r1", "foo:cq3", 3, "bar3");
+    assertFalse(ski1.hasTop());
+    
+    ski1.close();
+  }
+  
+  public void test6() throws Exception {
+    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    
+    mutate(imm, "r1", "foo:cq1", 3, "bar1");
+    mutate(imm, "r1", "foo:cq2", 3, "bar2");
+    mutate(imm, "r1", "foo:cq3", 3, "bar3");
+    mutate(imm, "r1", "foo:cq4", 3, "bar4");
+    
+    MemoryIterator ski1 = imm.skvIterator();
+    
+    mutate(imm, "r1", "foo:cq5", 3, "bar5");
+    
+    SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null);
+    
+    ski1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(ski1, "r1", "foo:cq1", 3, "bar1");
+    
+    dc.seek(new Range(nk("r1", "foo:cq2", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(dc, "r1", "foo:cq2", 3, "bar2");
+    
+    imm.delete(0);
+    
+    ae(ski1, "r1", "foo:cq2", 3, "bar2");
+    ae(dc, "r1", "foo:cq3", 3, "bar3");
+    ae(ski1, "r1", "foo:cq3", 3, "bar3");
+    ae(dc, "r1", "foo:cq4", 3, "bar4");
+    ae(ski1, "r1", "foo:cq4", 3, "bar4");
+    assertFalse(ski1.hasTop());
+    assertFalse(dc.hasTop());
+    
+    ski1.seek(new Range(nk("r1", "foo:cq3", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+    
+    dc.seek(new Range(nk("r1", "foo:cq4", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(dc, "r1", "foo:cq4", 3, "bar4");
+    assertFalse(dc.hasTop());
+    
+    ae(ski1, "r1", "foo:cq3", 3, "bar3");
+    ae(ski1, "r1", "foo:cq4", 3, "bar4");
+    assertFalse(ski1.hasTop());
+    assertFalse(dc.hasTop());
+    
+    ski1.close();
+  }
+  
+  public void testBug1() throws Exception {
+    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    
+    for (int i = 0; i < 20; i++) {
+      mutate(imm, "r1", "foo:cq" + i, 3, "bar" + i);
+    }
+    
+    for (int i = 0; i < 20; i++) {
+      mutate(imm, "r2", "foo:cq" + i, 3, "bar" + i);
+    }
+    
+    MemoryIterator ski1 = imm.skvIterator();
+    ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(ski1);
+    
+    imm.delete(0);
+    
+    ArrayList<ByteSequence> columns = new ArrayList<ByteSequence>();
+    columns.add(new ArrayByteSequence("bar"));
+    
+    // this seek resulted in an infinite loop before a bug was fixed
+    cfsi.seek(new Range("r1"), columns, true);
+    
+    assertFalse(cfsi.hasTop());
+    
+    ski1.close();
+  }
+  
+  public void testSeekBackWards() throws Exception {
+    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    
+    mutate(imm, "r1", "foo:cq1", 3, "bar1");
+    mutate(imm, "r1", "foo:cq2", 3, "bar2");
+    mutate(imm, "r1", "foo:cq3", 3, "bar3");
+    mutate(imm, "r1", "foo:cq4", 3, "bar4");
+    
+    MemoryIterator skvi1 = imm.skvIterator();
+    
+    skvi1.seek(new Range(nk("r1", "foo:cq3", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(skvi1, "r1", "foo:cq3", 3, "bar3");
+    
+    skvi1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(skvi1, "r1", "foo:cq1", 3, "bar1");
+    
+  }
+  
+  public void testDuplicateKey() throws Exception {
+    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    
+    Mutation m = new Mutation(new Text("r1"));
+    m.put(new Text("foo"), new Text("cq"), 3, new Value("v1".getBytes()));
+    m.put(new Text("foo"), new Text("cq"), 3, new Value("v2".getBytes()));
+    imm.mutate(Collections.singletonList(m));
+    
+    MemoryIterator skvi1 = imm.skvIterator();
+    skvi1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ae(skvi1, "r1", "foo:cq", 3, "v2");
+    ae(skvi1, "r1", "foo:cq", 3, "v1");
+  }
+  
+  private static final Logger log = Logger.getLogger(InMemoryMapTest.class);
+  
+  static long sum(long[] counts) {
+    long result = 0;
+    for (int i = 0; i < counts.length; i++)
+      result += counts[i];
+    return result;
+  }
+  
+  // @Test - hard to get this timing test to run well on apache build machines
+  public void parallelWriteSpeed() throws InterruptedException {
+    List<Double> timings = new ArrayList<Double>();
+    for (int threads : new int[] {1, 2, 16, /* 64, 256 */}) {
+      final long now = System.currentTimeMillis();
+      final long counts[] = new long[threads];
+      final InMemoryMap imm = new InMemoryMap(false, "/tmp");
+      ExecutorService e = Executors.newFixedThreadPool(threads);
+      for (int j = 0; j < threads; j++) {
+        final int threadId = j;
+        e.execute(new Runnable() {
+          @Override
+          public void run() {
+            while (System.currentTimeMillis() - now < 1000) {
+              for (int k = 0; k < 1000; k++) {
+                Mutation m = new Mutation("row");
+                m.put("cf", "cq", new Value("v".getBytes()));
+                List<Mutation> mutations = Collections.singletonList(m);
+                imm.mutate(mutations);
+                counts[threadId]++;
+              }
+            }
+          }
+        });
+      }
+      e.shutdown();
+      e.awaitTermination(10, TimeUnit.SECONDS);
+      imm.delete(10000);
+      double mutationsPerSecond = sum(counts) / ((System.currentTimeMillis() - now) / 1000.);
+      timings.add(mutationsPerSecond);
+      log.info(String.format("%.1f mutations per second with %d threads", mutationsPerSecond, threads));
+    }
+    // verify that more threads doesn't go a lot faster, or a lot slower than one thread
+    for (int i = 0; i < timings.size(); i++) {
+      double ratioFirst = timings.get(0) / timings.get(i);
+      assertTrue(ratioFirst < 3);
+      assertTrue(ratioFirst > 0.3);
+    }
+  }
+  
+  public void testLocalityGroups() throws Exception {
+    
+    Map<String,Set<ByteSequence>> lggroups1 = new HashMap<String,Set<ByteSequence>>();
+    lggroups1.put("lg1", newCFSet("cf1", "cf2"));
+    lggroups1.put("lg2", newCFSet("cf3", "cf4"));
+    
+    InMemoryMap imm = new InMemoryMap(lggroups1, false, "/tmp");
+    
+    Mutation m1 = new Mutation("r1");
+    m1.put("cf1", "x", 2, "1");
+    m1.put("cf1", "y", 2, "2");
+    m1.put("cf3", "z", 2, "3");
+    m1.put("foo", "b", 2, "9");
+    
+    Mutation m2 = new Mutation("r2");
+    m2.put("cf2", "x", 3, "5");
+    
+    Mutation m3 = new Mutation("r3");
+    m3.put("foo", "b", 4, "6");
+    
+    Mutation m4 = new Mutation("r4");
+    m4.put("foo", "b", 5, "7");
+    m4.put("cf4", "z", 5, "8");
+    
+    Mutation m5 = new Mutation("r5");
+    m5.put("cf3", "z", 6, "A");
+    m5.put("cf4", "z", 6, "B");
+    
+    imm.mutate(Arrays.asList(m1, m2, m3, m4, m5));
+    
+    MemoryIterator iter1 = imm.skvIterator();
+    
+    seekLocalityGroups(iter1);
+    SortedKeyValueIterator<Key,Value> dc1 = iter1.deepCopy(null);
+    seekLocalityGroups(dc1);
+    
+    assertTrue(imm.getNumEntries() == 10);
+    assertTrue(imm.estimatedSizeInBytes() > 0);
+
+    imm.delete(0);
+
+    seekLocalityGroups(iter1);
+    seekLocalityGroups(dc1);
+    // TODO uncomment following when ACCUMULO-1628 is fixed
+    // seekLocalityGroups(iter1.deepCopy(null));
+  }
+
+  private void seekLocalityGroups(SortedKeyValueIterator<Key,Value> iter1) throws IOException {
+    iter1.seek(new Range(), newCFSet("cf1"), true);
+    ae(iter1, "r1", "cf1:x", 2, "1");
+    ae(iter1, "r1", "cf1:y", 2, "2");
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    assertFalse(iter1.hasTop());
+    
+    iter1.seek(new Range("r2", "r4"), newCFSet("cf1"), true);
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    assertFalse(iter1.hasTop());
+
+    iter1.seek(new Range(), newCFSet("cf3"), true);
+    ae(iter1, "r1", "cf3:z", 2, "3");
+    ae(iter1, "r4", "cf4:z", 5, "8");
+    ae(iter1, "r5", "cf3:z", 6, "A");
+    ae(iter1, "r5", "cf4:z", 6, "B");
+    assertFalse(iter1.hasTop());
+    
+    iter1.seek(new Range(), newCFSet("foo"), true);
+    ae(iter1, "r1", "foo:b", 2, "9");
+    ae(iter1, "r3", "foo:b", 4, "6");
+    ae(iter1, "r4", "foo:b", 5, "7");
+    assertFalse(iter1.hasTop());
+    
+    iter1.seek(new Range(), newCFSet("cf1", "cf3"), true);
+    ae(iter1, "r1", "cf1:x", 2, "1");
+    ae(iter1, "r1", "cf1:y", 2, "2");
+    ae(iter1, "r1", "cf3:z", 2, "3");
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    ae(iter1, "r4", "cf4:z", 5, "8");
+    ae(iter1, "r5", "cf3:z", 6, "A");
+    ae(iter1, "r5", "cf4:z", 6, "B");
+    assertFalse(iter1.hasTop());
+    
+    iter1.seek(new Range("r2", "r4"), newCFSet("cf1", "cf3"), true);
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    ae(iter1, "r4", "cf4:z", 5, "8");
+    assertFalse(iter1.hasTop());
+
+    iter1.seek(new Range(), newCFSet("cf1", "cf3", "foo"), true);
+    assertAll(iter1);
+    
+    iter1.seek(new Range("r1", "r2"), newCFSet("cf1", "cf3", "foo"), true);
+    ae(iter1, "r1", "cf1:x", 2, "1");
+    ae(iter1, "r1", "cf1:y", 2, "2");
+    ae(iter1, "r1", "cf3:z", 2, "3");
+    ae(iter1, "r1", "foo:b", 2, "9");
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    assertFalse(iter1.hasTop());
+
+    iter1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+    assertAll(iter1);
+    
+    iter1.seek(new Range(), newCFSet("cf1"), false);
+    assertAll(iter1);
+    
+    iter1.seek(new Range(), newCFSet("cf1", "cf2"), false);
+    ae(iter1, "r1", "cf3:z", 2, "3");
+    ae(iter1, "r1", "foo:b", 2, "9");
+    ae(iter1, "r3", "foo:b", 4, "6");
+    ae(iter1, "r4", "cf4:z", 5, "8");
+    ae(iter1, "r4", "foo:b", 5, "7");
+    ae(iter1, "r5", "cf3:z", 6, "A");
+    ae(iter1, "r5", "cf4:z", 6, "B");
+    assertFalse(iter1.hasTop());
+
+    iter1.seek(new Range("r2"), newCFSet("cf1", "cf3", "foo"), true);
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    assertFalse(iter1.hasTop());
+  }
+
+  private void assertAll(SortedKeyValueIterator<Key,Value> iter1) throws IOException {
+    ae(iter1, "r1", "cf1:x", 2, "1");
+    ae(iter1, "r1", "cf1:y", 2, "2");
+    ae(iter1, "r1", "cf3:z", 2, "3");
+    ae(iter1, "r1", "foo:b", 2, "9");
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    ae(iter1, "r3", "foo:b", 4, "6");
+    ae(iter1, "r4", "cf4:z", 5, "8");
+    ae(iter1, "r4", "foo:b", 5, "7");
+    ae(iter1, "r5", "cf3:z", 6, "A");
+    ae(iter1, "r5", "cf4:z", 6, "B");
+    assertFalse(iter1.hasTop());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java b/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
new file mode 100644
index 0000000..c69baa4
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.log;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapFile.Writer;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class MultiReaderTest {
+  
+  VolumeManager fs;
+  TemporaryFolder root = new TemporaryFolder();
+  
+  @Before
+  public void setUp() throws Exception {
+    // quiet log messages about compress.CodecPool
+    Logger.getRootLogger().setLevel(Level.ERROR);
+    fs = VolumeManagerImpl.getLocal();
+    root.create();
+    String path = root.getRoot().getAbsolutePath();
+    Path root = new Path("file://" + path + "/manyMaps");
+    fs.mkdirs(root);
+    fs.create(new Path(root, "finished")).close();
+    FileSystem ns = fs.getDefaultVolume();
+    Writer writer = new Writer(ns.getConf(), ns, new Path(root, "odd").toString(), IntWritable.class, BytesWritable.class);
+    BytesWritable value = new BytesWritable("someValue".getBytes());
+    for (int i = 1; i < 1000; i += 2) {
+      writer.append(new IntWritable(i), value);
+    }
+    writer.close();
+    writer = new Writer(ns.getConf(), ns, new Path(root, "even").toString(), IntWritable.class, BytesWritable.class);
+    for (int i = 0; i < 1000; i += 2) {
+      if (i == 10)
+        continue;
+      writer.append(new IntWritable(i), value);
+    }
+    writer.close();
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    root.create();
+  }
+  
+  private void scan(MultiReader reader, int start) throws IOException {
+    IntWritable key = new IntWritable();
+    BytesWritable value = new BytesWritable();
+    
+    for (int i = start + 1; i < 1000; i++) {
+      if (i == 10)
+        continue;
+      assertTrue(reader.next(key, value));
+      assertEquals(i, key.get());
+    }
+  }
+  
+  private void scanOdd(MultiReader reader, int start) throws IOException {
+    IntWritable key = new IntWritable();
+    BytesWritable value = new BytesWritable();
+    
+    for (int i = start + 2; i < 1000; i += 2) {
+      assertTrue(reader.next(key, value));
+      assertEquals(i, key.get());
+    }
+  }
+  
+  @Test
+  public void testMultiReader() throws IOException {
+    Path manyMaps = new Path("file://" + root.getRoot().getAbsolutePath() + "/manyMaps");
+    MultiReader reader = new MultiReader(fs, manyMaps);
+    IntWritable key = new IntWritable();
+    BytesWritable value = new BytesWritable();
+    
+    for (int i = 0; i < 1000; i++) {
+      if (i == 10)
+        continue;
+      assertTrue(reader.next(key, value));
+      assertEquals(i, key.get());
+    }
+    assertEquals(value.compareTo(new BytesWritable("someValue".getBytes())), 0);
+    assertFalse(reader.next(key, value));
+    
+    key.set(500);
+    assertTrue(reader.seek(key));
+    scan(reader, 500);
+    key.set(10);
+    assertFalse(reader.seek(key));
+    scan(reader, 10);
+    key.set(1000);
+    assertFalse(reader.seek(key));
+    assertFalse(reader.next(key, value));
+    key.set(-1);
+    assertFalse(reader.seek(key));
+    key.set(0);
+    assertTrue(reader.next(key, value));
+    assertEquals(0, key.get());
+    reader.close();
+    
+    fs.deleteRecursively(new Path(manyMaps, "even"));
+    reader = new MultiReader(fs, manyMaps);
+    key.set(501);
+    assertTrue(reader.seek(key));
+    scanOdd(reader, 501);
+    key.set(1000);
+    assertFalse(reader.seek(key));
+    assertFalse(reader.next(key, value));
+    key.set(-1);
+    assertFalse(reader.seek(key));
+    key.set(1);
+    assertTrue(reader.next(key, value));
+    assertEquals(1, key.get());
+    reader.close();
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
new file mode 100644
index 0000000..37d7774
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.log;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
+import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
+import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.MapFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class SortedLogRecoveryTest {
+  
+  static final KeyExtent extent = new KeyExtent(new Text("table"), null, null);
+  static final Text cf = new Text("cf");
+  static final Text cq = new Text("cq");
+  static final Value value = new Value("value".getBytes());
+  
+  static class KeyValue implements Comparable<KeyValue> {
+    public final LogFileKey key;
+    public final LogFileValue value;
+    
+    KeyValue() {
+      key = new LogFileKey();
+      value = new LogFileValue();
+    }
+    
+    @Override
+    public int compareTo(KeyValue o) {
+      return key.compareTo(o.key);
+    }
+  }
+  
+  private static KeyValue createKeyValue(LogEvents type, long seq, int tid, Object fileExtentMutation) {
+    KeyValue result = new KeyValue();
+    result.key.event = type;
+    result.key.seq = seq;
+    result.key.tid = tid;
+    switch (type) {
+      case OPEN:
+        result.key.tserverSession = (String) fileExtentMutation;
+        break;
+      case COMPACTION_FINISH:
+        break;
+      case COMPACTION_START:
+        result.key.filename = (String) fileExtentMutation;
+        break;
+      case DEFINE_TABLET:
+        result.key.tablet = (KeyExtent) fileExtentMutation;
+        break;
+      case MUTATION:
+        result.value.mutations = Arrays.asList((Mutation) fileExtentMutation);
+        break;
+      case MANY_MUTATIONS:
+        result.value.mutations = Arrays.asList((Mutation[])fileExtentMutation);
+    }
+    return result;
+  }
+  
+  private static class CaptureMutations implements MutationReceiver {
+    public ArrayList<Mutation> result = new ArrayList<Mutation>();
+    
+    @Override
+    public void receive(Mutation m) {
+      // make a copy of Mutation:
+      result.add(m);
+    }
+  }
+  
+  private static List<Mutation> recover(Map<String,KeyValue[]> logs, KeyExtent extent) throws IOException {
+    return recover(logs, new HashSet<String>(), extent);
+  }
+  
+  private static List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent) throws IOException {
+    TemporaryFolder root = new TemporaryFolder();
+    root.create();
+    final String workdir = "file://" + root.getRoot().getAbsolutePath() + "/workdir";
+    VolumeManager fs = VolumeManagerImpl.getLocal();
+    fs.deleteRecursively(new Path(workdir));
+    ArrayList<Path> dirs = new ArrayList<Path>();
+    try {
+      for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
+        String path = workdir + "/" + entry.getKey();
+        FileSystem ns = fs.getFileSystemByPath(new Path(path));
+        Writer map = new MapFile.Writer(ns.getConf(), ns, path + "/log1", LogFileKey.class, LogFileValue.class);
+        for (KeyValue lfe : entry.getValue()) {
+          map.append(lfe.key, lfe.value);
+        }
+        map.close();
+        ns.create(new Path(path, "finished")).close();
+        dirs.add(new Path(path));
+      }
+      // Recover
+      SortedLogRecovery recovery = new SortedLogRecovery(fs);
+      CaptureMutations capture = new CaptureMutations();
+      recovery.recover(extent, dirs, files, capture);
+      return capture.result;
+    } finally {
+      root.delete();
+    }
+  }
+  
+  @Test
+  public void testCompactionCrossesLogs() throws IOException {
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    Mutation m2 = new ServerMutation(new Text("row2"));
+    m2.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, ignored),};
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 4, 1, "somefile"), createKeyValue(MUTATION, 7, 1, m),};
+    KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 0, 2, "23"), createKeyValue(DEFINE_TABLET, 1, 2, extent),
+        createKeyValue(COMPACTION_START, 5, 2, "newfile"), createKeyValue(COMPACTION_FINISH, 6, 2, null), createKeyValue(MUTATION, 3, 2, ignored),
+        createKeyValue(MUTATION, 4, 2, ignored),};
+    KeyValue entries4[] = new KeyValue[] {createKeyValue(OPEN, 0, 3, "69"), createKeyValue(DEFINE_TABLET, 1, 3, extent),
+        createKeyValue(MUTATION, 2, 3, ignored), createKeyValue(MUTATION, 3, 3, ignored), createKeyValue(MUTATION, 4, 3, ignored),};
+    KeyValue entries5[] = new KeyValue[] {createKeyValue(OPEN, 0, 4, "70"), createKeyValue(DEFINE_TABLET, 1, 4, extent),
+        createKeyValue(COMPACTION_START, 3, 4, "thisfile"), createKeyValue(MUTATION, 2, 4, ignored), createKeyValue(MUTATION, 6, 4, m2),};
+    
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    logs.put("entries2", entries2);
+    logs.put("entries3", entries3);
+    logs.put("entries4", entries4);
+    logs.put("entries5", entries5);
+    
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    
+    // Verify recovered data
+    Assert.assertEquals(2, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+    Assert.assertEquals(m2, mutations.get(1));
+  }
+  
+  @Test
+  public void testCompactionCrossesLogs5() throws IOException {
+    // Create a test log
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    Mutation m2 = new ServerMutation(new Text("row2"));
+    m2.put(cf, cq, value);
+    Mutation m3 = new ServerMutation(new Text("row3"));
+    m3.put(cf, cq, value);
+    Mutation m4 = new ServerMutation(new Text("row4"));
+    m4.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, ignored),};
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "2"), createKeyValue(DEFINE_TABLET, 6, 1, extent),
+        createKeyValue(MUTATION, 7, 1, ignored),};
+    KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 8, -1, "3"), createKeyValue(DEFINE_TABLET, 9, 1, extent),
+        createKeyValue(COMPACTION_FINISH, 10, 1, null), createKeyValue(COMPACTION_START, 12, 1, "newfile"), createKeyValue(COMPACTION_FINISH, 13, 1, null),
+        // createKeyValue(COMPACTION_FINISH, 14, 1, null),
+        createKeyValue(MUTATION, 11, 1, ignored), createKeyValue(MUTATION, 15, 1, m), createKeyValue(MUTATION, 16, 1, m2),};
+    KeyValue entries4[] = new KeyValue[] {createKeyValue(OPEN, 17, -1, "4"), createKeyValue(DEFINE_TABLET, 18, 1, extent),
+        createKeyValue(COMPACTION_START, 20, 1, "file"), createKeyValue(MUTATION, 19, 1, m3), createKeyValue(MUTATION, 21, 1, m4),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    logs.put("entries2", entries2);
+    logs.put("entries3", entries3);
+    logs.put("entries4", entries4);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(4, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+    Assert.assertEquals(m2, mutations.get(1));
+    Assert.assertEquals(m3, mutations.get(2));
+    Assert.assertEquals(m4, mutations.get(3));
+  }
+  
+  @Test
+  public void testCompactionCrossesLogs6() throws IOException {
+    // Create a test log
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    Mutation m2 = new ServerMutation(new Text("row2"));
+    m2.put(cf, cq, value);
+    Mutation m3 = new ServerMutation(new Text("row3"));
+    m3.put(cf, cq, value);
+    Mutation m4 = new ServerMutation(new Text("row4"));
+    m4.put(cf, cq, value);
+    Mutation m5 = new ServerMutation(new Text("row5"));
+    m5.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent), createKeyValue(MUTATION, 1, 1, ignored),
+        createKeyValue(MUTATION, 3, 1, m),};
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 2, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 3, 1, "somefile"), createKeyValue(MUTATION, 3, 1, m2),};
+    
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    logs.put("entries2", entries2);
+    
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    
+    // Verify recovered data
+    Assert.assertEquals(2, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+    Assert.assertEquals(m2, mutations.get(1));
+  }
+  
+  @Test
+  public void testEmpty() throws IOException {
+    // Create a test log
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("testlog", entries);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(0, mutations.size());
+    
+  }
+  
+  @Test
+  public void testMissingDefinition() {
+    // Create a test log
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("testlog", entries);
+    // Recover
+    try {
+      recover(logs, extent);
+      Assert.fail("tablet should not have been found");
+    } catch (Throwable t) {}
+  }
+  
+  @Test
+  public void testSimple() throws IOException {
+    // Create a test log
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent), createKeyValue(MUTATION, 2, 1, m),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("testlog", entries);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+  }
+  
+  @Test
+  public void testSkipSuccessfulCompaction() throws IOException {
+    // Create a test log
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 4, 1, null), createKeyValue(MUTATION, 2, 1, ignored),
+        createKeyValue(MUTATION, 5, 1, m),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("testlog", entries);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+  }
+  
+  @Test
+  public void testSkipSuccessfulCompactionAcrossFiles() throws IOException {
+    // Create a test log
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored),};
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 4, -1, "1"), createKeyValue(DEFINE_TABLET, 5, 1, extent),
+        createKeyValue(COMPACTION_FINISH, 6, 1, null), createKeyValue(MUTATION, 7, 1, m),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    logs.put("entries2", entries2);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+  }
+  
+  @Test
+  public void testGetMutationsAfterCompactionStart() throws IOException {
+    // Create a test log
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    Mutation m2 = new ServerMutation(new Text("row2"));
+    m2.put(cf, cq, new Value("123".getBytes()));
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, m),};
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "1"), createKeyValue(DEFINE_TABLET, 6, 1, extent),
+        createKeyValue(COMPACTION_FINISH, 7, 1, null), createKeyValue(MUTATION, 8, 1, m2),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    logs.put("entries2", entries2);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(2, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+    Assert.assertEquals(m2, mutations.get(1));
+  }
+  
+  @Test
+  public void testDoubleFinish() throws IOException {
+    // Create a test log
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    Mutation m2 = new ServerMutation(new Text("row2"));
+    m2.put(cf, cq, new Value("123".getBytes()));
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_FINISH, 2, 1, null), createKeyValue(COMPACTION_START, 4, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 6, 1, null),
+        createKeyValue(MUTATION, 3, 1, ignored), createKeyValue(MUTATION, 5, 1, m), createKeyValue(MUTATION, 7, 1, m2),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(2, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+    Assert.assertEquals(m2, mutations.get(1));
+  }
+  
+  @Test
+  public void testCompactionCrossesLogs2() throws IOException {
+    // Create a test log
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    Mutation m2 = new ServerMutation(new Text("row2"));
+    m2.put(cf, cq, value);
+    Mutation m3 = new ServerMutation(new Text("row3"));
+    m3.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, m),};
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "1"), createKeyValue(DEFINE_TABLET, 6, 1, extent), createKeyValue(MUTATION, 7, 1, m2),};
+    KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 8, -1, "1"), createKeyValue(DEFINE_TABLET, 9, 1, extent),
+        createKeyValue(COMPACTION_FINISH, 10, 1, null), createKeyValue(MUTATION, 11, 1, m3),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    logs.put("entries2", entries2);
+    logs.put("entries3", entries3);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(3, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+    Assert.assertEquals(m2, mutations.get(1));
+    Assert.assertEquals(m3, mutations.get(2));
+  }
+  
+  @Test
+  public void testBug1() throws IOException {
+    // this unit test reproduces a bug that occurred, nothing should recover
+    Mutation m1 = new ServerMutation(new Text("row1"));
+    m1.put(cf, cq, value);
+    Mutation m2 = new ServerMutation(new Text("row2"));
+    m2.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 30, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 32, 1, "somefile"), createKeyValue(MUTATION, 29, 1, m1),
+        createKeyValue(MUTATION, 30, 1, m2),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("testlog", entries);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(0, mutations.size());
+  }
+  
+  @Test
+  public void testBug2() throws IOException {
+    // Create a test log
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    Mutation m2 = new ServerMutation(new Text("row2"));
+    m2.put(cf, cq, value);
+    Mutation m3 = new ServerMutation(new Text("row3"));
+    m3.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 2, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 4, 1, null), createKeyValue(MUTATION, 3, 1, m),};
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "1"), createKeyValue(DEFINE_TABLET, 6, 1, extent),
+        createKeyValue(COMPACTION_START, 8, 1, "somefile"), createKeyValue(MUTATION, 7, 1, m2), createKeyValue(MUTATION, 9, 1, m3),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    logs.put("entries2", entries2);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(3, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+    Assert.assertEquals(m2, mutations.get(1));
+    Assert.assertEquals(m3, mutations.get(2));
+  }
+  
+  @Test
+  public void testCompactionCrossesLogs4() throws IOException {
+    // Create a test log
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    Mutation m2 = new ServerMutation(new Text("row2"));
+    m2.put(cf, cq, value);
+    Mutation m3 = new ServerMutation(new Text("row3"));
+    m3.put(cf, cq, value);
+    Mutation m4 = new ServerMutation(new Text("row4"));
+    m4.put(cf, cq, value);
+    Mutation m5 = new ServerMutation(new Text("row5"));
+    m5.put(cf, cq, value);
+    Mutation m6 = new ServerMutation(new Text("row6"));
+    m6.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 4, 1, "somefile"),
+        // createKeyValue(COMPACTION_FINISH, 5, 1, null),
+        createKeyValue(MUTATION, 2, 1, m), createKeyValue(MUTATION, 3, 1, m2),};
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "2"), createKeyValue(DEFINE_TABLET, 6, 1, extent), createKeyValue(MUTATION, 7, 1, m3),
+        createKeyValue(MUTATION, 8, 1, m4),};
+    KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 9, -1, "3"), createKeyValue(DEFINE_TABLET, 10, 1, extent),
+        // createKeyValue(COMPACTION_FINISH, 11, 1, null),
+        createKeyValue(COMPACTION_START, 12, 1, "somefile"),
+        // createKeyValue(COMPACTION_FINISH, 14, 1, null),
+        // createKeyValue(COMPACTION_START, 15, 1, "somefile"),
+        // createKeyValue(COMPACTION_FINISH, 17, 1, null),
+        // createKeyValue(COMPACTION_START, 18, 1, "somefile"),
+        // createKeyValue(COMPACTION_FINISH, 19, 1, null),
+        createKeyValue(MUTATION, 8, 1, m5), createKeyValue(MUTATION, 20, 1, m6),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    logs.put("entries2", entries2);
+    logs.put("entries3", entries3);
+    // Recover
+    
+    List<Mutation> mutations = recover(logs, extent);
+    
+    // Verify recovered data
+    Assert.assertEquals(6, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+    Assert.assertEquals(m2, mutations.get(1));
+    Assert.assertEquals(m3, mutations.get(2));
+    Assert.assertEquals(m4, mutations.get(3));
+    Assert.assertEquals(m5, mutations.get(4));
+    Assert.assertEquals(m6, mutations.get(5));
+  }
+  
+  @Test
+  public void testLookingForBug3() throws IOException {
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    Mutation m2 = new ServerMutation(new Text("row2"));
+    m2.put(cf, cq, value);
+    Mutation m3 = new ServerMutation(new Text("row3"));
+    m3.put(cf, cq, value);
+    Mutation m4 = new ServerMutation(new Text("row4"));
+    m4.put(cf, cq, value);
+    Mutation m5 = new ServerMutation(new Text("row5"));
+    m5.put(cf, cq, value);
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 2, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 3, 1, null), createKeyValue(MUTATION, 1, 1, ignored),
+        createKeyValue(MUTATION, 3, 1, m), createKeyValue(MUTATION, 3, 1, m2), createKeyValue(MUTATION, 3, 1, m3),};
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 2, 1, "somefile2"), createKeyValue(MUTATION, 3, 1, m4), createKeyValue(MUTATION, 3, 1, m5),};
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    logs.put("entries2", entries2);
+    // Recover
+    List<Mutation> mutations = recover(logs, extent);
+    // Verify recovered data
+    Assert.assertEquals(5, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+    Assert.assertEquals(m2, mutations.get(1));
+    Assert.assertEquals(m3, mutations.get(2));
+    Assert.assertEquals(m4, mutations.get(3));
+    Assert.assertEquals(m5, mutations.get(4));
+  }
+  
+  @Test
+  public void testMultipleTabletDefinition() throws Exception {
+    // test for a tablet defined multiple times in a log file
+    // there was a bug where the oldest tablet id was used instead
+    // of the newest
+    
+    Mutation ignored = new ServerMutation(new Text("row1"));
+    ignored.put("foo", "bar", "v1");
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put("foo", "bar", "v1");
+    
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(DEFINE_TABLET, 1, 2, extent), createKeyValue(MUTATION, 2, 2, ignored), createKeyValue(COMPACTION_START, 3, 2, "somefile"),
+        createKeyValue(MUTATION, 4, 2, m), createKeyValue(COMPACTION_FINISH, 6, 2, null),};
+    
+    Arrays.sort(entries);
+    
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    
+    List<Mutation> mutations = recover(logs, extent);
+    
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+  }
+  
+  @Test
+  public void testNoFinish0() throws Exception {
+    // its possible that a minor compaction finishes successfully, but the process dies before writing the compaction event
+    
+    Mutation ignored = new ServerMutation(new Text("row1"));
+    ignored.put("foo", "bar", "v1");
+    
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 2, extent),
+        createKeyValue(MUTATION, 2, 2, ignored), createKeyValue(COMPACTION_START, 3, 2, "/t/f1")};
+    
+    Arrays.sort(entries);
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    
+    List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent);
+    
+    Assert.assertEquals(0, mutations.size());
+  }
+  
+  @Test
+  public void testNoFinish1() throws Exception {
+    // its possible that a minor compaction finishes successfully, but the process dies before writing the compaction event
+    
+    Mutation ignored = new ServerMutation(new Text("row1"));
+    ignored.put("foo", "bar", "v1");
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put("foo", "bar", "v2");
+    
+    KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 2, extent),
+        createKeyValue(MUTATION, 2, 2, ignored), createKeyValue(COMPACTION_START, 3, 2, "/t/f1"), createKeyValue(MUTATION, 4, 2, m),};
+    
+    Arrays.sort(entries);
+    Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+    logs.put("entries", entries);
+    
+    List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent);
+    
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m, mutations.get(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java
new file mode 100644
index 0000000..f91c195
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.logger;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
+import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
+import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
+import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class LogFileTest {
+  
+  static private void readWrite(LogEvents event, long seq, int tid, String filename, KeyExtent tablet, Mutation[] mutations, LogFileKey keyResult,
+      LogFileValue valueResult) throws IOException {
+    LogFileKey key = new LogFileKey();
+    key.event = event;
+    key.seq = seq;
+    key.tid = tid;
+    key.filename = filename;
+    key.tablet = tablet;
+    key.tserverSession = keyResult.tserverSession;
+    LogFileValue value = new LogFileValue();
+    value.mutations = Arrays.asList(mutations != null ? mutations : new Mutation[0]);
+    DataOutputBuffer out = new DataOutputBuffer();
+    key.write(out);
+    value.write(out);
+    out.flush();
+    DataInputBuffer in = new DataInputBuffer();
+    in.reset(out.getData(), out.size());
+    keyResult.readFields(in);
+    valueResult.readFields(in);
+    assertTrue(key.compareTo(keyResult) == 0);
+    assertEquals(value.mutations, valueResult.mutations);
+    assertTrue(in.read() == -1);
+  }
+  
+  @Test
+  public void testReadFields() throws IOException {
+    LogFileKey key = new LogFileKey();
+    LogFileValue value = new LogFileValue();
+    key.tserverSession = "";
+    readWrite(OPEN, -1, -1, null, null, null, key, value);
+    assertEquals(key.event, OPEN);
+    readWrite(COMPACTION_FINISH, 1, 2, null, null, null, key, value);
+    assertEquals(key.event, COMPACTION_FINISH);
+    assertEquals(key.seq, 1);
+    assertEquals(key.tid, 2);
+    readWrite(COMPACTION_START, 3, 4, "some file", null, null, key, value);
+    assertEquals(key.event, COMPACTION_START);
+    assertEquals(key.seq, 3);
+    assertEquals(key.tid, 4);
+    assertEquals(key.filename, "some file");
+    KeyExtent tablet = new KeyExtent(new Text("table"), new Text("bbbb"), new Text("aaaa"));
+    readWrite(DEFINE_TABLET, 5, 6, null, tablet, null, key, value);
+    assertEquals(key.event, DEFINE_TABLET);
+    assertEquals(key.seq, 5);
+    assertEquals(key.tid, 6);
+    assertEquals(key.tablet, tablet);
+    Mutation m = new ServerMutation(new Text("row"));
+    m.put(new Text("cf"), new Text("cq"), new Value("value".getBytes()));
+    readWrite(MUTATION, 7, 8, null, null, new Mutation[] {m}, key, value);
+    assertEquals(key.event, MUTATION);
+    assertEquals(key.seq, 7);
+    assertEquals(key.tid, 8);
+    assertEquals(value.mutations, Arrays.asList(m));
+    m = new ServerMutation(new Text("row"));
+    m.put(new Text("cf"), new Text("cq"), new ColumnVisibility("vis"), 12345, new Value("value".getBytes()));
+    m.put(new Text("cf"), new Text("cq"), new ColumnVisibility("vis2"), new Value("value".getBytes()));
+    m.putDelete(new Text("cf"), new Text("cq"), new ColumnVisibility("vis2"));
+    readWrite(MUTATION, 8, 9, null, null, new Mutation[] {m}, key, value);
+    assertEquals(key.event, MUTATION);
+    assertEquals(key.seq, 8);
+    assertEquals(key.tid, 9);
+    assertEquals(value.mutations, Arrays.asList(m));
+    readWrite(MANY_MUTATIONS, 9, 10, null, null, new Mutation[] {m, m}, key, value);
+    assertEquals(key.event, MANY_MUTATIONS);
+    assertEquals(key.seq, 9);
+    assertEquals(key.tid, 10);
+    assertEquals(value.mutations, Arrays.asList(m, m));
+  }
+  
+  @Test
+  public void testEventType() {
+    assertEquals(LogFileKey.eventType(MUTATION), LogFileKey.eventType(MANY_MUTATIONS));
+    assertEquals(LogFileKey.eventType(COMPACTION_START), LogFileKey.eventType(COMPACTION_FINISH));
+    assertTrue(LogFileKey.eventType(DEFINE_TABLET) < LogFileKey.eventType(COMPACTION_FINISH));
+    assertTrue(LogFileKey.eventType(COMPACTION_FINISH) < LogFileKey.eventType(MUTATION));
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/utils/pom.xml
----------------------------------------------------------------------
diff --git a/server/utils/pom.xml b/server/utils/pom.xml
index 51e5a71..32995ff 100644
--- a/server/utils/pom.xml
+++ b/server/utils/pom.xml
@@ -48,7 +48,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
-      <artifactId>accumulo-server-base</artifactId>
+      <artifactId>accumulo-master</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
@@ -59,6 +59,10 @@
       <artifactId>accumulo-trace</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-tserver</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/utils/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java
----------------------------------------------------------------------
diff --git a/server/utils/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java b/server/utils/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java
new file mode 100644
index 0000000..83a8a41
--- /dev/null
+++ b/server/utils/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A map reduce job that takes a set of walogs and filters out all non metadata table events.
+ */
+public class FilterMeta extends Configured implements Tool {
+  
+  public static class FilterMapper extends Mapper<LogFileKey,LogFileValue,LogFileKey,LogFileValue> {
+    private Set<Integer> tabletIds;
+    
+    @Override
+    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {
+      tabletIds = new HashSet<Integer>();
+    }
+    
+    @Override
+    public void map(LogFileKey key, LogFileValue value, Context context) throws IOException, InterruptedException {
+      if (key.event == LogEvents.OPEN) {
+        context.write(key, value);
+      } else if (key.event == LogEvents.DEFINE_TABLET && key.tablet.getTableId().toString().equals(MetadataTable.ID)) {
+        tabletIds.add(key.tid);
+        context.write(key, value);
+      } else if ((key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) && tabletIds.contains(key.tid)) {
+        context.write(key, value);
+      }
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    
+    String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
+    
+    Job job = new Job(getConf(), jobName);
+    job.setJarByClass(this.getClass());
+    
+    Path paths[] = new Path[args.length - 1];
+    for (int i = 0; i < paths.length; i++) {
+      paths[i] = new Path(args[i]);
+    }
+
+    job.setInputFormatClass(LogFileInputFormat.class);
+    LogFileInputFormat.setInputPaths(job, paths);
+    
+    job.setOutputFormatClass(LogFileOutputFormat.class);
+    LogFileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
+
+    job.setMapperClass(FilterMapper.class);
+    
+    job.setNumReduceTasks(0);
+
+    job.waitForCompletion(true);
+    return job.isSuccessful() ? 0 : 1;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(CachedConfiguration.getInstance(), new FilterMeta(), args);
+    System.exit(res);
+  }
+}


Mime
View raw message