accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [46/59] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars
Date Sat, 07 Sep 2013 03:28:49 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
deleted file mode 100644
index a10ecb7..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.log;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.tabletserver.Tablet;
-import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
-import org.apache.accumulo.server.tabletserver.TabletMutations;
-import org.apache.accumulo.server.tabletserver.TabletServer;
-import org.apache.accumulo.server.tabletserver.log.DfsLogger.LoggerOperation;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-/**
- * Central logging facility for the TServerInfo.
- * 
- * Forwards in-memory updates to remote logs, carefully writing the same data to every log, while maintaining the maximum thread parallelism for greater
- * performance. As new logs are used and minor compactions are performed, the metadata table is kept up-to-date.
- * 
- */
-public class TabletServerLogger {
-  
-  private static final Logger log = Logger.getLogger(TabletServerLogger.class);
-  
-  private final AtomicLong logSizeEstimate = new AtomicLong();
-  private final long maxSize;
-  
-  private final TabletServer tserver;
-  
-  // The current log set: always updated to a new set with every change of loggers
-  private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
-  
-  // The current generation of logSet.
-  // Because multiple threads can be using a log set at one time, a log
-  // failure is likely to affect multiple threads, who will all attempt to
-  // create a new logSet. This will cause many unnecessary updates to the
-  // metadata table.
-  // We'll use this generational counter to determine if another thread has
-  // already fetched a new logSet.
-  private AtomicInteger logSetId = new AtomicInteger();
-  
-  // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them
-  private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock();
-  
-  private final AtomicInteger seqGen = new AtomicInteger();
-  
-  private static boolean enabled(Tablet tablet) {
-    return tablet.getTableConfiguration().getBoolean(Property.TABLE_WALOG_ENABLED);
-  }
-  
-  private static boolean enabled(CommitSession commitSession) {
-    return enabled(commitSession.getTablet());
-  }
-  
-  static private abstract class TestCallWithWriteLock {
-    abstract boolean test();
-    
-    abstract void withWriteLock() throws IOException;
-  }
-  
-  /**
-   * Pattern taken from the documentation for ReentrantReadWriteLock
-   * 
-   * @param rwlock
-   *          lock to use
-   * @param code
-   *          a test/work pair
-   * @throws IOException
-   */
-  private static void testLockAndRun(final ReadWriteLock rwlock, TestCallWithWriteLock code) throws IOException {
-    // Get a read lock
-    rwlock.readLock().lock();
-    try {
-      // does some condition exist that needs the write lock?
-      if (code.test()) {
-        // Yes, let go of the readlock
-        rwlock.readLock().unlock();
-        // Grab the write lock
-        rwlock.writeLock().lock();
-        try {
-          // double-check the condition, since we let go of the lock
-          if (code.test()) {
-            // perform the work with with write lock held
-            code.withWriteLock();
-          }
-        } finally {
-          // regain the readlock
-          rwlock.readLock().lock();
-          // unlock the write lock
-          rwlock.writeLock().unlock();
-        }
-      }
-    } finally {
-      // always let go of the lock
-      rwlock.readLock().unlock();
-    }
-  }
-  
-  public TabletServerLogger(TabletServer tserver, long maxSize) {
-    this.tserver = tserver;
-    this.maxSize = maxSize;
-  }
-  
-  private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
-    final int[] result = {-1};
-    testLockAndRun(logSetLock, new TestCallWithWriteLock() {
-      boolean test() {
-        copy.clear();
-        copy.addAll(loggers);
-        if (!loggers.isEmpty())
-          result[0] = logSetId.get();
-        return loggers.isEmpty();
-      }
-      
-      void withWriteLock() throws IOException {
-        try {
-          createLoggers();
-          copy.clear();
-          copy.addAll(loggers);
-          if (copy.size() > 0)
-            result[0] = logSetId.get();
-          else
-            result[0] = -1;
-        } catch (IOException e) {
-          log.error("Unable to create loggers", e);
-        }
-      }
-    });
-    return result[0];
-  }
-  
-  public void getLoggers(Set<String> loggersOut) {
-    logSetLock.readLock().lock();
-    try {
-      for (DfsLogger logger : loggers) {
-        loggersOut.add(logger.toString());
-      }
-    } finally {
-      logSetLock.readLock().unlock();
-    }
-  }
-  
-  synchronized private void createLoggers() throws IOException {
-    if (!logSetLock.isWriteLockedByCurrentThread()) {
-      throw new IllegalStateException("createLoggers should be called with write lock held!");
-    }
-    
-    if (loggers.size() != 0) {
-      throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size());
-    }
-    
-    try {
-      DfsLogger alog = new DfsLogger(tserver.getServerConfig());
-      alog.open(tserver.getClientAddressString());
-      loggers.add(alog);
-      logSetId.incrementAndGet();
-      return;
-    } catch (Exception t) {
-      throw new RuntimeException(t);
-    }
-  }
-  
-  public void resetLoggers() throws IOException {
-    logSetLock.writeLock().lock();
-    try {
-      close();
-    } finally {
-      logSetLock.writeLock().unlock();
-    }
-  }
-  
-  synchronized private void close() throws IOException {
-    if (!logSetLock.isWriteLockedByCurrentThread()) {
-      throw new IllegalStateException("close should be called with write lock held!");
-    }
-    try {
-      for (DfsLogger logger : loggers) {
-        try {
-          logger.close();
-        } catch (DfsLogger.LogClosedException ex) {
-          // ignore
-        } catch (Throwable ex) {
-          log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex);
-        }
-      }
-      loggers.clear();
-      logSizeEstimate.set(0);
-    } catch (Throwable t) {
-      throw new IOException(t);
-    }
-  }
-  
-  interface Writer {
-    LoggerOperation write(DfsLogger logger, int seq) throws Exception;
-  }
-  
-  private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
-    List<CommitSession> sessions = Collections.singletonList(commitSession);
-    return write(sessions, mincFinish, writer);
-  }
-  
-  private int write(Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
-    // Work very hard not to lock this during calls to the outside world
-    int currentLogSet = logSetId.get();
-    
-    int seq = -1;
-    
-    int attempt = 0;
-    boolean success = false;
-    while (!success) {
-      try {
-        // get a reference to the loggers that no other thread can touch
-        ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
-        currentLogSet = initializeLoggers(copy);
-        
-        // add the logger to the log set for the memory in the tablet,
-        // update the metadata table if we've never used this tablet
-        
-        if (currentLogSet == logSetId.get()) {
-          for (CommitSession commitSession : sessions) {
-            if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
-              try {
-                // Scribble out a tablet definition and then write to the metadata table
-                defineTablet(commitSession);
-                if (currentLogSet == logSetId.get())
-                  tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId());
-              } finally {
-                commitSession.finishUpdatingLogsUsed();
-              }
-            }
-          }
-        }
-        
-        // Make sure that the logs haven't changed out from underneath our copy
-        if (currentLogSet == logSetId.get()) {
-          
-          // write the mutation to the logs
-          seq = seqGen.incrementAndGet();
-          if (seq < 0)
-            throw new RuntimeException("Logger sequence generator wrapped!  Onos!!!11!eleven");
-          ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
-          for (DfsLogger wal : copy) {
-            LoggerOperation lop = writer.write(wal, seq);
-            if (lop != null)
-              queuedOperations.add(lop);
-          }
-          
-          for (LoggerOperation lop : queuedOperations) {
-            lop.await();
-          }
-          
-          // double-check: did the log set change?
-          success = (currentLogSet == logSetId.get());
-        }
-      } catch (DfsLogger.LogClosedException ex) {
-        log.debug("Logs closed while writing, retrying " + (attempt + 1));
-      } catch (Exception t) {
-        log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
-        UtilWaitThread.sleep(100);
-      } finally {
-        attempt++;
-      }
-      // Some sort of write failure occurred. Grab the write lock and reset the logs.
-      // But since multiple threads will attempt it, only attempt the reset when
-      // the logs haven't changed.
-      final int finalCurrent = currentLogSet;
-      if (!success) {
-        testLockAndRun(logSetLock, new TestCallWithWriteLock() {
-          
-          @Override
-          boolean test() {
-            return finalCurrent == logSetId.get();
-          }
-          
-          @Override
-          void withWriteLock() throws IOException {
-            close();
-          }
-        });
-      }
-    }
-    // if the log gets too big, reset it .. grab the write lock first
-    logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead
-    testLockAndRun(logSetLock, new TestCallWithWriteLock() {
-      boolean test() {
-        return logSizeEstimate.get() > maxSize;
-      }
-      
-      void withWriteLock() throws IOException {
-        close();
-      }
-    });
-    return seq;
-  }
-  
-  public int defineTablet(final CommitSession commitSession) throws IOException {
-    // scribble this into the metadata tablet, too.
-    if (!enabled(commitSession))
-      return -1;
-    return write(commitSession, false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-        logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
-        return null;
-      }
-    });
-  }
-  
-  public int log(final CommitSession commitSession, final int tabletSeq, final Mutation m) throws IOException {
-    if (!enabled(commitSession))
-      return -1;
-    int seq = write(commitSession, false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-        return logger.log(tabletSeq, commitSession.getLogId(), m);
-      }
-    });
-    logSizeEstimate.addAndGet(m.numBytes());
-    return seq;
-  }
-  
-  public int logManyTablets(Map<CommitSession,List<Mutation>> mutations) throws IOException {
-    
-    final Map<CommitSession,List<Mutation>> loggables = new HashMap<CommitSession,List<Mutation>>(mutations);
-    for (CommitSession t : mutations.keySet()) {
-      if (!enabled(t))
-        loggables.remove(t);
-    }
-    if (loggables.size() == 0)
-      return -1;
-    
-    int seq = write(loggables.keySet(), false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-        List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
-        for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
-          CommitSession cs = entry.getKey();
-          copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue()));
-        }
-        return logger.logManyTablets(copy);
-      }
-    });
-    for (List<Mutation> entry : loggables.values()) {
-      if (entry.size() < 1)
-        throw new IllegalArgumentException("logManyTablets: logging empty mutation list");
-      for (Mutation m : entry) {
-        logSizeEstimate.addAndGet(m.numBytes());
-      }
-    }
-    return seq;
-  }
-  
-  public void minorCompactionFinished(final CommitSession commitSession, final String fullyQualifiedFileName, final int walogSeq) throws IOException {
-    
-    if (!enabled(commitSession))
-      return;
-    
-    long t1 = System.currentTimeMillis();
-    
-    int seq = write(commitSession, true, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-        logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
-        return null;
-      }
-    });
-    
-    long t2 = System.currentTimeMillis();
-    
-    log.debug(" wrote MinC finish  " + seq + ": writeTime:" + (t2 - t1) + "ms ");
-  }
-  
-  public int minorCompactionStarted(final CommitSession commitSession, final int seq, final String fullyQualifiedFileName) throws IOException {
-    if (!enabled(commitSession))
-      return -1;
-    write(commitSession, false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-        logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
-        return null;
-      }
-    });
-    return seq;
-  }
-  
-  public void recover(VolumeManager fs, Tablet tablet, List<Path> logs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
-    if (!enabled(tablet))
-      return;
-    try {
-      SortedLogRecovery recovery = new SortedLogRecovery(fs);
-      KeyExtent extent = tablet.getExtent();
-      recovery.recover(extent, logs, tabletFiles, mr);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/MasterMessage.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/MasterMessage.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/MasterMessage.java
deleted file mode 100644
index 0a6badf..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/MasterMessage.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.mastermessage;
-
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.master.thrift.MasterClientService;
-import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.thrift.TException;
-
-public interface MasterMessage {
-  
-  void send(TCredentials info, String serverName, MasterClientService.Iface client) throws TException, ThriftSecurityException;
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java
deleted file mode 100644
index 7b83576..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.mastermessage;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.accumulo.trace.instrument.Tracer;
-import org.apache.accumulo.core.client.impl.Translator;
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.MasterClientService;
-import org.apache.accumulo.core.master.thrift.TabletSplit;
-import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.hadoop.io.Text;
-import org.apache.thrift.TException;
-
-public class SplitReportMessage implements MasterMessage {
-  Map<KeyExtent,Text> extents;
-  KeyExtent old_extent;
-  
-  public SplitReportMessage(KeyExtent old_extent, Map<KeyExtent,Text> newExtents) {
-    this.old_extent = old_extent;
-    extents = new TreeMap<KeyExtent,Text>(newExtents);
-  }
-  
-  public SplitReportMessage(KeyExtent old_extent, KeyExtent ne1, Text np1, KeyExtent ne2, Text np2) {
-    this.old_extent = old_extent;
-    extents = new TreeMap<KeyExtent,Text>();
-    extents.put(ne1, np1);
-    extents.put(ne2, np2);
-  }
-  
-  public void send(TCredentials credentials, String serverName, MasterClientService.Iface client) throws TException, ThriftSecurityException {
-    TabletSplit split = new TabletSplit();
-    split.oldTablet = old_extent.toThrift();
-    split.newTablets = Translator.translate(extents.keySet(), Translator.KET);
-    client.reportSplitExtent(Tracer.traceInfo(), credentials, serverName, split);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java
deleted file mode 100644
index 69dedbf..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.mastermessage;
-
-import org.apache.accumulo.trace.instrument.Tracer;
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TabletLoadState;
-import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
-import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.thrift.TException;
-
-public class TabletStatusMessage implements MasterMessage {
-  
-  private KeyExtent extent;
-  private TabletLoadState status;
-  
-  public TabletStatusMessage(TabletLoadState status, KeyExtent extent) {
-    this.extent = extent;
-    this.status = status;
-  }
-  
-  public void send(TCredentials auth, String serverName, Iface client) throws TException, ThriftSecurityException {
-    client.reportTabletStatus(Tracer.traceInfo(), auth, serverName, status, extent.toThrift());
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMBean.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMBean.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMBean.java
deleted file mode 100644
index 8f41720..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMBean.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.metrics;
-
-public interface TabletServerMBean {
-  
-  public int getOnlineCount();
-  
-  public int getOpeningCount();
-  
-  public int getUnopenedCount();
-  
-  public int getMajorCompactions();
-  
-  public int getMajorCompactionsQueued();
-  
-  public int getMinorCompactions();
-  
-  public int getMinorCompactionsQueued();
-  
-  public long getEntries();
-  
-  public long getEntriesInMemory();
-  
-  public long getQueries();
-  
-  public long getIngest();
-  
-  public long getTotalMinorCompactions();
-  
-  public double getHoldTime();
-  
-  public String getName();
-  
-  public double getAverageFilesPerTablet();
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMinCMetrics.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMinCMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMinCMetrics.java
deleted file mode 100644
index b0225a3..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMinCMetrics.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.metrics;
-
-import javax.management.ObjectName;
-
-import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
-
-public class TabletServerMinCMetrics extends AbstractMetricsImpl implements TabletServerMinCMetricsMBean {
-  
-  static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(TabletServerMinCMetrics.class);
-  
-  private static final String METRICS_PREFIX = "tserver.minc";
-  
-  private static ObjectName OBJECT_NAME = null;
-  
-  public TabletServerMinCMetrics() {
-    super();
-    reset();
-    try {
-      OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMinCMetricsMBean,instance=" + Thread.currentThread().getName());
-    } catch (Exception e) {
-      log.error("Exception setting MBean object name", e);
-    }
-  }
-  
-  @Override
-  protected ObjectName getObjectName() {
-    return OBJECT_NAME;
-  }
-  
-  @Override
-  protected String getMetricsPrefix() {
-    return METRICS_PREFIX;
-  }
-  
-  public long getMinorCompactionMinTime() {
-    return this.getMetricMin(minc);
-  }
-  
-  public long getMinorCompactionAvgTime() {
-    return this.getMetricAvg(minc);
-  }
-  
-  public long getMinorCompactionCount() {
-    return this.getMetricCount(minc);
-  }
-  
-  public long getMinorCompactionMaxTime() {
-    return this.getMetricMax(minc);
-  }
-  
-  public long getMinorCompactionQueueAvgTime() {
-    return this.getMetricAvg(queue);
-  }
-  
-  public long getMinorCompactionQueueCount() {
-    return this.getMetricCount(queue);
-  }
-  
-  public long getMinorCompactionQueueMaxTime() {
-    return this.getMetricMax(queue);
-  }
-  
-  public long getMinorCompactionQueueMinTime() {
-    return this.getMetricMin(minc);
-  }
-  
-  public void reset() {
-    createMetric("minc");
-    createMetric("queue");
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMinCMetricsMBean.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMinCMetricsMBean.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMinCMetricsMBean.java
deleted file mode 100644
index e9e254f..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerMinCMetricsMBean.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.metrics;
-
-public interface TabletServerMinCMetricsMBean {
-  
-  public static final String minc = "minc";
-  public static final String queue = "queue";
-  
-  public long getMinorCompactionCount();
-  
-  public long getMinorCompactionAvgTime();
-  
-  public long getMinorCompactionMinTime();
-  
-  public long getMinorCompactionMaxTime();
-  
-  public long getMinorCompactionQueueCount();
-  
-  public long getMinorCompactionQueueAvgTime();
-  
-  public long getMinorCompactionQueueMinTime();
-  
-  public long getMinorCompactionQueueMaxTime();
-  
-  public void reset();
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerScanMetrics.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerScanMetrics.java
deleted file mode 100644
index e2dad1b..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerScanMetrics.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.metrics;
-
-import javax.management.ObjectName;
-
-import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
-
-public class TabletServerScanMetrics extends AbstractMetricsImpl implements TabletServerScanMetricsMBean {
-  
-  static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(TabletServerScanMetrics.class);
-  
-  public static final String METRICS_PREFIX = "tserver.scan";
-  
-  public static ObjectName OBJECT_NAME = null;
-  
-  public TabletServerScanMetrics() {
-    super();
-    reset();
-    try {
-      OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerScanMetricsMBean,instance=" + Thread.currentThread().getName());
-    } catch (Exception e) {
-      log.error("Exception setting MBean object name", e);
-    }
-  }
-  
-  @Override
-  protected ObjectName getObjectName() {
-    return OBJECT_NAME;
-  }
-  
-  @Override
-  protected String getMetricsPrefix() {
-    return METRICS_PREFIX;
-  }
-  
-  public long getResultAvgSize() {
-    return this.getMetricAvg(resultSize);
-  }
-  
-  public long getResultCount() {
-    return this.getMetricCount(resultSize);
-  }
-  
-  public long getResultMaxSize() {
-    return this.getMetricMax(resultSize);
-  }
-  
-  public long getResultMinSize() {
-    return this.getMetricMin(resultSize);
-  }
-  
-  public long getScanAvgTime() {
-    return this.getMetricAvg(scan);
-  }
-  
-  public long getScanCount() {
-    return this.getMetricCount(scan);
-  }
-  
-  public long getScanMaxTime() {
-    return this.getMetricMax(scan);
-  }
-  
-  public long getScanMinTime() {
-    return this.getMetricMin(scan);
-  }
-  
-  public void reset() {
-    createMetric(scan);
-    createMetric(resultSize);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerScanMetricsMBean.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerScanMetricsMBean.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerScanMetricsMBean.java
deleted file mode 100644
index 1fda37c..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerScanMetricsMBean.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.metrics;
-
-public interface TabletServerScanMetricsMBean {
-  
-  public static final String scan = "scan";
-  public static final String resultSize = "result";
-  
-  public long getScanCount();
-  
-  public long getScanAvgTime();
-  
-  public long getScanMinTime();
-  
-  public long getScanMaxTime();
-  
-  public long getResultCount();
-  
-  public long getResultAvgSize();
-  
-  public long getResultMinSize();
-  
-  public long getResultMaxSize();
-  
-  public void reset();
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerUpdateMetrics.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerUpdateMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerUpdateMetrics.java
deleted file mode 100644
index d256cff..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerUpdateMetrics.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.metrics;
-
-import javax.management.ObjectName;
-
-import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
-
-public class TabletServerUpdateMetrics extends AbstractMetricsImpl implements TabletServerUpdateMetricsMBean {
-  
-  static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(TabletServerUpdateMetrics.class);
-  
-  private static final String METRICS_PREFIX = "tserver.update";
-  
-  private static ObjectName OBJECT_NAME = null;
-  
-  public TabletServerUpdateMetrics() {
-    super();
-    reset();
-    try {
-      OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerUpdateMetricsMBean,instance="
-          + Thread.currentThread().getName());
-    } catch (Exception e) {
-      log.error("Exception setting MBean object name", e);
-    }
-  }
-  
-  @Override
-  protected ObjectName getObjectName() {
-    return OBJECT_NAME;
-  }
-  
-  @Override
-  protected String getMetricsPrefix() {
-    return METRICS_PREFIX;
-  }
-  
-  public long getPermissionErrorCount() {
-    return this.getMetricCount(permissionErrors);
-  }
-  
-  public long getUnknownTabletErrorCount() {
-    return this.getMetricCount(unknownTabletErrors);
-  }
-  
-  public long getMutationArrayAvgSize() {
-    return this.getMetricAvg(mutationArraySize);
-  }
-  
-  public long getMutationArrayMinSize() {
-    return this.getMetricMin(mutationArraySize);
-  }
-  
-  public long getMutationArrayMaxSize() {
-    return this.getMetricMax(mutationArraySize);
-  }
-  
-  public long getCommitPrepCount() {
-    return this.getMetricCount(commitPrep);
-  }
-  
-  public long getCommitPrepMinTime() {
-    return this.getMetricMin(commitPrep);
-  }
-  
-  public long getCommitPrepMaxTime() {
-    return this.getMetricMax(commitPrep);
-  }
-  
-  public long getCommitPrepAvgTime() {
-    return this.getMetricAvg(commitPrep);
-  }
-  
-  public long getConstraintViolationCount() {
-    return this.getMetricCount(constraintViolations);
-  }
-  
-  public long getWALogWriteCount() {
-    return this.getMetricCount(waLogWriteTime);
-  }
-  
-  public long getWALogWriteMinTime() {
-    return this.getMetricMin(waLogWriteTime);
-  }
-  
-  public long getWALogWriteMaxTime() {
-    return this.getMetricMax(waLogWriteTime);
-  }
-  
-  public long getWALogWriteAvgTime() {
-    return this.getMetricAvg(waLogWriteTime);
-  }
-  
-  public long getCommitCount() {
-    return this.getMetricCount(commitTime);
-  }
-  
-  public long getCommitMinTime() {
-    return this.getMetricMin(commitTime);
-  }
-  
-  public long getCommitMaxTime() {
-    return this.getMetricMax(commitTime);
-  }
-  
-  public long getCommitAvgTime() {
-    return this.getMetricAvg(commitTime);
-  }
-  
-  public void reset() {
-    createMetric(permissionErrors);
-    createMetric(unknownTabletErrors);
-    createMetric(mutationArraySize);
-    createMetric(commitPrep);
-    createMetric(constraintViolations);
-    createMetric(waLogWriteTime);
-    createMetric(commitTime);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerUpdateMetricsMBean.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerUpdateMetricsMBean.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerUpdateMetricsMBean.java
deleted file mode 100644
index bfa58ec..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/metrics/TabletServerUpdateMetricsMBean.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.metrics;
-
-public interface TabletServerUpdateMetricsMBean {
-  
-  public final static String permissionErrors = "permissionErrors";
-  public final static String unknownTabletErrors = "unknownTabletErrors";
-  public final static String mutationArraySize = "mutationArraysSize";
-  public final static String commitPrep = "commitPrep";
-  public final static String constraintViolations = "constraintViolations";
-  public final static String waLogWriteTime = "waLogWriteTime";
-  public final static String commitTime = "commitTime";
-  
-  public long getPermissionErrorCount();
-  
-  public long getUnknownTabletErrorCount();
-  
-  public long getMutationArrayAvgSize();
-  
-  public long getMutationArrayMinSize();
-  
-  public long getMutationArrayMaxSize();
-  
-  public long getCommitPrepCount();
-  
-  public long getCommitPrepMinTime();
-  
-  public long getCommitPrepMaxTime();
-  
-  public long getCommitPrepAvgTime();
-  
-  public long getConstraintViolationCount();
-  
-  public long getWALogWriteCount();
-  
-  public long getWALogWriteMinTime();
-  
-  public long getWALogWriteMaxTime();
-  
-  public long getWALogWriteAvgTime();
-  
-  public long getCommitCount();
-  
-  public long getCommitMinTime();
-  
-  public long getCommitMaxTime();
-  
-  public long getCommitAvgTime();
-  
-  public void reset();
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
new file mode 100644
index 0000000..97fb7db
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Copy failed bulk imports.
+ */
+public class BulkFailedCopyProcessor implements Processor {
+  
+  private static final Logger log = Logger.getLogger(BulkFailedCopyProcessor.class);
+  
+  @Override
+  public Processor newProcessor() {
+    return new BulkFailedCopyProcessor();
+  }
+  
+  @Override
+  public void process(String workID, byte[] data) {
+    
+    String paths[] = new String(data).split(",");
+    
+    Path orig = new Path(paths[0]);
+    Path dest = new Path(paths[1]);
+    Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp");
+    
+    try {
+      FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
+          ServerConfiguration.getSiteConfiguration()));
+      
+      FileUtil.copy(fs, orig, fs, tmp, false, true, CachedConfiguration.getInstance());
+      fs.rename(tmp, dest);
+      log.debug("copied " + orig + " to " + dest);
+    } catch (IOException ex) {
+      try {
+        FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
+            ServerConfiguration.getSiteConfiguration()));
+        
+        fs.create(dest).close();
+        log.warn(" marked " + dest + " failed", ex);
+      } catch (IOException e) {
+        log.error("Unable to create failure flag file " + dest, e);
+      }
+    }
+
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionQueue.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionQueue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionQueue.java
new file mode 100644
index 0000000..a0574d9
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionQueue.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class CompactionQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
+  
+  private List<Comparable> task = new LinkedList<Comparable>();
+  
+  @Override
+  public synchronized Runnable poll() {
+    if (task.size() == 0)
+      return null;
+    
+    Comparable min = Collections.min(task);
+    task.remove(min);
+    return (Runnable) min;
+  }
+  
+  @Override
+  public synchronized Runnable peek() {
+    if (task.size() == 0)
+      return null;
+    
+    Comparable min = Collections.min(task);
+    return (Runnable) min;
+  }
+  
+  @Override
+  public synchronized boolean offer(Runnable e) {
+    task.add((Comparable) e);
+    notify();
+    return true;
+  }
+  
+  @Override
+  public synchronized void put(Runnable e) throws InterruptedException {
+    task.add((Comparable) e);
+    notify();
+  }
+  
+  @Override
+  public synchronized boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException {
+    task.add((Comparable) e);
+    notify();
+    return true;
+  }
+  
+  @Override
+  public synchronized Runnable take() throws InterruptedException {
+    while (task.size() == 0) {
+      wait();
+    }
+    
+    return poll();
+  }
+  
+  @Override
+  public synchronized Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
+    if (task.size() == 0) {
+      wait(unit.toMillis(timeout));
+    }
+    
+    if (task.size() == 0)
+      return null;
+    
+    return poll();
+  }
+  
+  @Override
+  public synchronized int remainingCapacity() {
+    return Integer.MAX_VALUE;
+  }
+  
+  @Override
+  public synchronized int drainTo(Collection<? super Runnable> c) {
+    return drainTo(c, task.size());
+  }
+  
+  @Override
+  public synchronized int drainTo(Collection<? super Runnable> c, int maxElements) {
+    Collections.sort(task);
+    
+    int num = Math.min(task.size(), maxElements);
+    
+    Iterator<Comparable> iter = task.iterator();
+    for (int i = 0; i < num; i++) {
+      c.add((Runnable) iter.next());
+      iter.remove();
+    }
+    
+    return num;
+  }
+  
+  @Override
+  public synchronized Iterator<Runnable> iterator() {
+    Collections.sort(task);
+    
+    final Iterator<Comparable> iter = task.iterator();
+    
+    return new Iterator<Runnable>() {
+      
+      @Override
+      public boolean hasNext() {
+        return iter.hasNext();
+      }
+      
+      @Override
+      public Runnable next() {
+        return (Runnable) iter.next();
+      }
+      
+      @Override
+      public void remove() {
+        iter.remove();
+      }
+    };
+  }
+  
+  @Override
+  public synchronized int size() {
+    return task.size();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java
new file mode 100644
index 0000000..d359e95
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+public class CompactionStats {
+  private long entriesRead;
+  private long entriesWritten;
+  private long fileSize;
+  
+  CompactionStats(long er, long ew) {
+    this.setEntriesRead(er);
+    this.setEntriesWritten(ew);
+  }
+  
+  public CompactionStats() {}
+  
+  private void setEntriesRead(long entriesRead) {
+    this.entriesRead = entriesRead;
+  }
+  
+  public long getEntriesRead() {
+    return entriesRead;
+  }
+  
+  private void setEntriesWritten(long entriesWritten) {
+    this.entriesWritten = entriesWritten;
+  }
+  
+  public long getEntriesWritten() {
+    return entriesWritten;
+  }
+  
+  public void add(CompactionStats mcs) {
+    this.entriesRead += mcs.entriesRead;
+    this.entriesWritten += mcs.entriesWritten;
+  }
+  
+  public void setFileSize(long fileSize) {
+    this.fileSize = fileSize;
+  }
+  
+  public long getFileSize() {
+    return this.fileSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
new file mode 100644
index 0000000..864e22f
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.tserver.Compactor.CompactionInfo;
+import org.apache.log4j.Logger;
+
+/**
+ * 
+ */
+public class CompactionWatcher implements Runnable {
+  private Map<List<Long>,ObservedCompactionInfo> observedCompactions = new HashMap<List<Long>,ObservedCompactionInfo>();
+  private AccumuloConfiguration config;
+  private static boolean watching = false;
+  
+  private static class ObservedCompactionInfo {
+    CompactionInfo compactionInfo;
+    long firstSeen;
+    boolean loggedWarning;
+    
+    ObservedCompactionInfo(CompactionInfo ci, long time) {
+      this.compactionInfo = ci;
+      this.firstSeen = time;
+    }
+  }
+
+  public CompactionWatcher(AccumuloConfiguration config) {
+    this.config = config;
+  }
+
+  public void run() {
+    List<CompactionInfo> runningCompactions = Compactor.getRunningCompactions();
+    
+    Set<List<Long>> newKeys = new HashSet<List<Long>>();
+    
+    long time = System.currentTimeMillis();
+
+    for (CompactionInfo ci : runningCompactions) {
+      List<Long> compactionKey = Arrays.asList(ci.getID(), ci.getEntriesRead(), ci.getEntriesWritten());
+      newKeys.add(compactionKey);
+      
+      if (!observedCompactions.containsKey(compactionKey)) {
+        observedCompactions.put(compactionKey, new ObservedCompactionInfo(ci, time));
+      }
+    }
+    
+    // look for compactions that finished or made progress and logged a warning
+    HashMap<List<Long>,ObservedCompactionInfo> copy = new HashMap<List<Long>,ObservedCompactionInfo>(observedCompactions);
+    copy.keySet().removeAll(newKeys);
+    
+    for (ObservedCompactionInfo oci : copy.values()) {
+      if (oci.loggedWarning) {
+        Logger.getLogger(CompactionWatcher.class).info("Compaction of " + oci.compactionInfo.getExtent() + " is no longer stuck");
+      }
+    }
+
+    // remove any compaction that completed or made progress
+    observedCompactions.keySet().retainAll(newKeys);
+    
+    long warnTime = config.getTimeInMillis(Property.TSERV_COMPACTION_WARN_TIME);
+
+    // check for stuck compactions
+    for (ObservedCompactionInfo oci : observedCompactions.values()) {
+      if (time - oci.firstSeen > warnTime && !oci.loggedWarning) {
+        Thread compactionThread = oci.compactionInfo.getThread();
+        if (compactionThread != null) {
+          StackTraceElement[] trace = compactionThread.getStackTrace();
+          Exception e = new Exception("Possible stack trace of compaction stuck on " + oci.compactionInfo.getExtent());
+          e.setStackTrace(trace);
+          Logger.getLogger(CompactionWatcher.class).warn(
+              "Compaction of " + oci.compactionInfo.getExtent() + " has not made progress for at least " + (time - oci.firstSeen) + "ms", e);
+          oci.loggedWarning = true;
+        }
+      }
+    }
+  }
+
+  public static synchronized void startWatching(AccumuloConfiguration config) {
+    if (!watching) {
+      SimpleTimer.getInstance().schedule(new CompactionWatcher(config), 10000, 10000);
+      watching = true;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java
new file mode 100644
index 0000000..f5463f3
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReportingIterator;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.tserver.Tablet.MajorCompactionReason;
+import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+
+
+public class Compactor implements Callable<CompactionStats> {
+  
+  public static class CountingIterator extends WrappingIterator {
+    
+    private long count;
+    private ArrayList<CountingIterator> deepCopies;
+    private AtomicLong entriesRead;
+    
+    public CountingIterator deepCopy(IteratorEnvironment env) {
+      return new CountingIterator(this, env);
+    }
+    
+    private CountingIterator(CountingIterator other, IteratorEnvironment env) {
+      setSource(other.getSource().deepCopy(env));
+      count = 0;
+      this.deepCopies = other.deepCopies;
+      this.entriesRead = other.entriesRead;
+      deepCopies.add(this);
+    }
+    
+    public CountingIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong entriesRead) {
+      deepCopies = new ArrayList<Compactor.CountingIterator>();
+      this.setSource(source);
+      count = 0;
+      this.entriesRead = entriesRead;
+    }
+    
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void next() throws IOException {
+      super.next();
+      count++;
+      if (count % 1024 == 0) {
+        entriesRead.addAndGet(1024);
+      }
+    }
+    
+    public long getCount() {
+      long sum = 0;
+      for (CountingIterator dc : deepCopies) {
+        sum += dc.count;
+      }
+      
+      return count + sum;
+    }
+  }
+
+  private static final Logger log = Logger.getLogger(Compactor.class);
+  
+  static class CompactionCanceledException extends Exception {
+    private static final long serialVersionUID = 1L;
+  }
+  
+  static interface CompactionEnv {
+    boolean isCompactionEnabled();
+    
+    IteratorScope getIteratorScope();
+  }
+  
+  private Map<FileRef,DataFileValue> filesToCompact;
+  private InMemoryMap imm;
+  private FileRef outputFile;
+  private boolean propogateDeletes;
+  private TableConfiguration acuTableConf;
+  private CompactionEnv env;
+  private Configuration conf;
+  private VolumeManager fs;
+  protected KeyExtent extent;
+  private List<IteratorSetting> iterators;
+  
+  // things to report
+  private String currentLocalityGroup = "";
+  private long startTime;
+
+  private MajorCompactionReason reason;
+  protected MinorCompactionReason mincReason;
+  
+  private AtomicLong entriesRead = new AtomicLong(0);
+  private AtomicLong entriesWritten = new AtomicLong(0);
+  private DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
+  
+  private static AtomicLong nextCompactorID = new AtomicLong(0);
+  
+  // a unique id to identify a compactor
+  private long compactorID = nextCompactorID.getAndIncrement();
+
+  protected volatile Thread thread;
+
+  private synchronized void setLocalityGroup(String name) {
+    this.currentLocalityGroup = name;
+  }
+
+  private void clearStats() {
+    entriesRead.set(0);
+    entriesWritten.set(0);
+  }
+
+  protected static Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>());
+  
+  public static class CompactionInfo {
+    
+    private Compactor compactor;
+    private String localityGroup;
+    private long entriesRead;
+    private long entriesWritten;
+    
+    CompactionInfo(Compactor compactor) {
+      this.localityGroup = compactor.currentLocalityGroup;
+      this.entriesRead = compactor.entriesRead.get();
+      this.entriesWritten = compactor.entriesWritten.get();
+      this.compactor = compactor;
+    }
+
+    public long getID() {
+      return compactor.compactorID;
+    }
+    
+    public KeyExtent getExtent() {
+      return compactor.getExtent();
+    }
+    
+    public long getEntriesRead() {
+      return entriesRead;
+    }
+    
+    public long getEntriesWritten() {
+      return entriesWritten;
+    }
+
+    public Thread getThread() {
+      return compactor.thread;
+    }
+
+    public ActiveCompaction toThrift() {
+      
+      CompactionType type;
+      
+      if (compactor.imm != null)
+        if (compactor.filesToCompact.size() > 0)
+          type = CompactionType.MERGE;
+        else
+          type = CompactionType.MINOR;
+      else if (!compactor.propogateDeletes)
+        type = CompactionType.FULL;
+      else
+        type = CompactionType.MAJOR;
+      
+      CompactionReason reason;
+      
+      if (compactor.imm != null)
+        switch(compactor.mincReason){
+          case USER:
+            reason = CompactionReason.USER;
+            break;
+          case CLOSE:
+            reason = CompactionReason.CLOSE;
+            break;
+          case SYSTEM:
+          default:
+            reason = CompactionReason.SYSTEM;
+            break;
+        }
+      else
+        switch (compactor.reason) {
+          case USER:
+            reason = CompactionReason.USER;
+            break;
+          case CHOP:
+            reason = CompactionReason.CHOP;
+            break;
+          case IDLE:
+            reason = CompactionReason.IDLE;
+            break;
+          case NORMAL:
+          default:
+            reason = CompactionReason.SYSTEM;
+            break;
+        }
+      
+      List<IterInfo> iiList = new ArrayList<IterInfo>();
+      Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>();
+      
+      for (IteratorSetting iterSetting : compactor.iterators) {
+        iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
+        iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
+      }
+      List<String> filesToCompact = new ArrayList<String>();
+      for (FileRef ref : compactor.filesToCompact.keySet())
+        filesToCompact.add(ref.toString());
+      return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, filesToCompact, compactor.outputFile.toString(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
+    }
+  }
+  
+  public static List<CompactionInfo> getRunningCompactions() {
+    ArrayList<CompactionInfo> compactions = new ArrayList<Compactor.CompactionInfo>();
+    
+    synchronized (runningCompactions) {
+      for (Compactor compactor : runningCompactions) {
+        compactions.add(new CompactionInfo(compactor));
+      }
+    }
+    
+    return compactions;
+  }
+
+  Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
+      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) {
+    this.extent = extent;
+    this.conf = conf;
+    this.fs = fs;
+    this.filesToCompact = files;
+    this.imm = imm;
+    this.outputFile = outputFile;
+    this.propogateDeletes = propogateDeletes;
+    this.acuTableConf = acuTableConf;
+    this.env = env;
+    this.iterators = iterators;
+    this.reason = reason;
+    
+    startTime = System.currentTimeMillis();
+  }
+  
+  Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
+      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
+    this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null);
+  }
+  
+  public VolumeManager getFileSystem() {
+    return fs;
+  }
+  
+  KeyExtent getExtent() {
+    return extent;
+  }
+  
+  String getOutputFile() {
+    return outputFile.toString();
+  }
+  
+  @Override
+  public CompactionStats call() throws IOException, CompactionCanceledException {
+    
+    FileSKVWriter mfw = null;
+    
+    CompactionStats majCStats = new CompactionStats();
+
+    boolean remove = runningCompactions.add(this);
+    
+    clearStats();
+
+    String oldThreadName = Thread.currentThread().getName();
+    String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile;
+    Thread.currentThread().setName(newThreadName);
+    thread = Thread.currentThread();
+    try {
+      FileOperations fileFactory = FileOperations.getInstance();
+      FileSystem ns = this.fs.getFileSystemByPath(outputFile.path());
+      mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf);
+      
+      Map<String,Set<ByteSequence>> lGroups;
+      try {
+        lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf);
+      } catch (LocalityGroupConfigurationError e) {
+        throw new IOException(e);
+      }
+      
+      long t1 = System.currentTimeMillis();
+      
+      HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>();
+      
+      if (mfw.supportsLocalityGroups()) {
+        for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
+          setLocalityGroup(entry.getKey());
+          compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats);
+          allColumnFamilies.addAll(entry.getValue());
+        }
+      }
+      
+      setLocalityGroup("");
+      compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats);
+      
+      long t2 = System.currentTimeMillis();
+      
+      FileSKVWriter mfwTmp = mfw;
+      mfw = null; // set this to null so we do not try to close it again in finally if the close fails
+      mfwTmp.close(); // if the close fails it will cause the compaction to fail
+      
+      // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
+      try {
+        FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf);
+        openReader.close();
+      } catch (IOException ex) {
+        log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
+        throw ex;
+      }
+      
+      log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
+          majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
+      
+      majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf));
+      return majCStats;
+    } catch (IOException e) {
+      log.error(e, e);
+      throw e;
+    } catch (RuntimeException e) {
+      log.error(e, e);
+      throw e;
+    } finally {
+      Thread.currentThread().setName(oldThreadName);
+      if (remove) {
+        thread = null;
+        runningCompactions.remove(this);
+      }
+
+      try {
+        if (mfw != null) {
+          // compaction must not have finished successfully, so close its output file
+          try {
+            mfw.close();
+          } finally {
+            if (!fs.deleteRecursively(outputFile.path()))
+              if (fs.exists(outputFile.path()))
+                log.error("Unable to delete " + outputFile);
+          }
+        }
+      } catch (IOException e) {
+        log.warn(e, e);
+      }
+    }
+  }
+
+  private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException {
+    
+    List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
+    
+    for (FileRef mapFile : filesToCompact.keySet()) {
+      try {
+        
+        FileOperations fileFactory = FileOperations.getInstance();
+        FileSystem fs = this.fs.getFileSystemByPath(mapFile.path());
+        FileSKVIterator reader;
+        
+        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, conf, acuTableConf);
+        
+        readers.add(reader);
+        
+        SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader);
+        
+        if (filesToCompact.get(mapFile).isTimeSet()) {
+          iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
+        }
+        
+        iters.add(iter);
+        
+      } catch (Throwable e) {
+        
+        ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e));
+        
+        log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
+        // failed to open some map file... close the ones that were opened
+        for (FileSKVIterator reader : readers) {
+          try {
+            reader.close();
+          } catch (Throwable e2) {
+            log.warn("Failed to close map file", e2);
+          }
+        }
+        
+        readers.clear();
+        
+        if (e instanceof IOException)
+          throw (IOException) e;
+        throw new IOException("Failed to open map data files", e);
+      }
+    }
+    
+    return iters;
+  }
+  
+  private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats)
+      throws IOException, CompactionCanceledException {
+    ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size());
+    Span span = Trace.start("compact");
+    try {
+      long entriesCompacted = 0;
+      List<SortedKeyValueIterator<Key,Value>> iters = openMapDataFiles(lgName, readers);
+      
+      if (imm != null) {
+        iters.add(imm.compactionIterator());
+      }
+      
+      CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange()), entriesRead);
+      DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes);
+      ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+      
+
+      // if(env.getIteratorScope() )
+      
+      TabletIteratorEnvironment iterEnv;
+      if (env.getIteratorScope() == IteratorScope.majc)
+        iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf);
+      else if (env.getIteratorScope() == IteratorScope.minc)
+        iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf);
+      else
+        throw new IllegalArgumentException();
+      
+      SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf,
+          iterators, iterEnv));
+      
+      itr.seek(extent.toDataRange(), columnFamilies, inclusive);
+      
+      if (!inclusive) {
+        mfw.startDefaultLocalityGroup();
+      } else {
+        mfw.startNewLocalityGroup(lgName, columnFamilies);
+      }
+      
+      Span write = Trace.start("write");
+      try {
+        while (itr.hasTop() && env.isCompactionEnabled()) {
+          mfw.append(itr.getTopKey(), itr.getTopValue());
+          itr.next();
+          entriesCompacted++;
+          
+          if (entriesCompacted % 1024 == 0) {
+            // Periodically update stats, do not want to do this too often since its volatile
+            entriesWritten.addAndGet(1024);
+          }
+        }
+
+        if (itr.hasTop() && !env.isCompactionEnabled()) {
+          // cancel major compaction operation
+          try {
+            try {
+              mfw.close();
+            } catch (IOException e) {
+              log.error(e, e);
+            }
+            fs.deleteRecursively(outputFile.path());
+          } catch (Exception e) {
+            log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
+          }
+          throw new CompactionCanceledException();
+        }
+        
+      } finally {
+        CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted);
+        majCStats.add(lgMajcStats);
+        write.stop();
+      }
+      
+    } finally {
+      // close sequence files opened
+      for (FileSKVIterator reader : readers) {
+        try {
+          reader.close();
+        } catch (Throwable e) {
+          log.warn("Failed to close map file", e);
+        }
+      }
+      span.stop();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
new file mode 100644
index 0000000..41281b5
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.tserver.data.ServerConditionalMutation;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * 
+ */
+public class ConditionalMutationSet {
+
+  static interface DeferFilter {
+    void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred);
+  }
+  
+  static class DuplicateFitler implements DeferFilter {
+    public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
+      okMutations.add(scml.get(0));
+      for (int i = 1; i < scml.size(); i++) {
+        if (Arrays.equals(scml.get(i - 1).getRow(), scml.get(i).getRow())) {
+          deferred.add(scml.get(i));
+        } else {
+          okMutations.add(scml.get(i));
+        }
+      }
+    }
+  }
+  
+  static void defer(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferredMutations, DeferFilter filter) {
+    for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
+      List<ServerConditionalMutation> scml = entry.getValue();
+      List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(scml.size());
+      List<ServerConditionalMutation> deferred = new ArrayList<ServerConditionalMutation>();
+      filter.defer(scml, okMutations, deferred);
+      
+      if (deferred.size() > 0) {
+        scml.clear();
+        scml.addAll(okMutations);
+        List<ServerConditionalMutation> l = deferredMutations.get(entry.getKey());
+        if (l == null) {
+          l = deferred;
+          deferredMutations.put(entry.getKey(), l);
+        } else {
+          l.addAll(deferred);
+        }
+
+      }
+    }
+  }
+  
+  static void deferDuplicatesRows(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
+    defer(updates, deferred, new DuplicateFitler());
+  }
+
+  static void sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates) {
+    for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
+      // TODO check if its already in sorted order?
+      // TODO maybe the potential benefit of sorting is not worth the cost
+      Collections.sort(entry.getValue(), new Comparator<ServerConditionalMutation>() {
+        @Override
+        public int compare(ServerConditionalMutation o1, ServerConditionalMutation o2) {
+          return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/EndOfTableException.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/EndOfTableException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/EndOfTableException.java
new file mode 100644
index 0000000..d2956ce
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/EndOfTableException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+public class EndOfTableException extends Exception {
+  
+  /**
+	 * 
+	 */
+  private static final long serialVersionUID = 1L;
+  
+}


Mime
View raw message