accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [07/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules
Date Fri, 01 Nov 2013 00:55:46 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java b/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java
deleted file mode 100644
index 3b9d8b2..0000000
--- a/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.security.handler;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.SystemPermission;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.log4j.Logger;
-
-/**
- * All the static too methods used for this class, so that we can separate out stuff that isn't using ZooKeeper. That way, we can check the synchronization
- * model more easily, as we only need to check to make sure zooCache is cleared when things are written to ZooKeeper in methods that might use it. These won't,
- * and so don't need to be checked.
- */
-class ZKSecurityTool {
-  private static final Logger log = Logger.getLogger(ZKSecurityTool.class);
-  private static final int SALT_LENGTH = 8;
-  
-  // Generates a byte array salt of length SALT_LENGTH
-  private static byte[] generateSalt() {
-    final SecureRandom random = new SecureRandom();
-    byte[] salt = new byte[SALT_LENGTH];
-    random.nextBytes(salt);
-    return salt;
-  }
-  
-  private static byte[] hash(byte[] raw) throws NoSuchAlgorithmException {
-    MessageDigest md = MessageDigest.getInstance(Constants.PW_HASH_ALGORITHM);
-    md.update(raw);
-    return md.digest();
-  }
-  
-  public static boolean checkPass(byte[] password, byte[] zkData) {
-    if (zkData == null)
-      return false;
-    
-    byte[] salt = new byte[SALT_LENGTH];
-    System.arraycopy(zkData, 0, salt, 0, SALT_LENGTH);
-    byte[] passwordToCheck;
-    try {
-      passwordToCheck = convertPass(password, salt);
-    } catch (NoSuchAlgorithmException e) {
-      log.error("Count not create hashed password", e);
-      return false;
-    }
-    return java.util.Arrays.equals(passwordToCheck, zkData);
-  }
-  
-  public static byte[] createPass(byte[] password) throws AccumuloException {
-    byte[] salt = generateSalt();
-    try {
-      return convertPass(password, salt);
-    } catch (NoSuchAlgorithmException e) {
-      log.error("Count not create hashed password", e);
-      throw new AccumuloException("Count not create hashed password", e);
-    }
-  }
-  
-  private static byte[] convertPass(byte[] password, byte[] salt) throws NoSuchAlgorithmException {
-    byte[] plainSalt = new byte[password.length + SALT_LENGTH];
-    System.arraycopy(password, 0, plainSalt, 0, password.length);
-    System.arraycopy(salt, 0, plainSalt, password.length, SALT_LENGTH);
-    byte[] hashed = hash(plainSalt);
-    byte[] saltedHash = new byte[SALT_LENGTH + hashed.length];
-    System.arraycopy(salt, 0, saltedHash, 0, SALT_LENGTH);
-    System.arraycopy(hashed, 0, saltedHash, SALT_LENGTH, hashed.length);
-    return saltedHash; // contains salt+hash(password+salt)
-  }
-  
-  public static Authorizations convertAuthorizations(byte[] authorizations) {
-    return new Authorizations(authorizations);
-  }
-  
-  public static byte[] convertAuthorizations(Authorizations authorizations) {
-    return authorizations.getAuthorizationsArray();
-  }
-  
-  public static byte[] convertSystemPermissions(Set<SystemPermission> systempermissions) {
-    ByteArrayOutputStream bytes = new ByteArrayOutputStream(systempermissions.size());
-    DataOutputStream out = new DataOutputStream(bytes);
-    try {
-      for (SystemPermission sp : systempermissions)
-        out.writeByte(sp.getId());
-    } catch (IOException e) {
-      log.error(e, e);
-      throw new RuntimeException(e); // this is impossible with ByteArrayOutputStream; crash hard if this happens
-    }
-    return bytes.toByteArray();
-  }
-  
-  public static Set<SystemPermission> convertSystemPermissions(byte[] systempermissions) {
-    ByteArrayInputStream bytes = new ByteArrayInputStream(systempermissions);
-    DataInputStream in = new DataInputStream(bytes);
-    Set<SystemPermission> toReturn = new HashSet<SystemPermission>();
-    try {
-      while (in.available() > 0)
-        toReturn.add(SystemPermission.getPermissionById(in.readByte()));
-    } catch (IOException e) {
-      log.error("User database is corrupt; error converting system permissions", e);
-      toReturn.clear();
-    }
-    return toReturn;
-  }
-  
-  public static byte[] convertTablePermissions(Set<TablePermission> tablepermissions) {
-    ByteArrayOutputStream bytes = new ByteArrayOutputStream(tablepermissions.size());
-    DataOutputStream out = new DataOutputStream(bytes);
-    try {
-      for (TablePermission tp : tablepermissions)
-        out.writeByte(tp.getId());
-    } catch (IOException e) {
-      log.error(e, e);
-      throw new RuntimeException(e); // this is impossible with ByteArrayOutputStream; crash hard if this happens
-    }
-    return bytes.toByteArray();
-  }
-  
-  public static Set<TablePermission> convertTablePermissions(byte[] tablepermissions) {
-    Set<TablePermission> toReturn = new HashSet<TablePermission>();
-    for (byte b : tablepermissions)
-      toReturn.add(TablePermission.getPermissionById(b));
-    return toReturn;
-  }
-  
-  public static String getInstancePath(String instanceId) {
-    return Constants.ZROOT + "/" + instanceId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
deleted file mode 100644
index c543881..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.trace.TraceFileSystem;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-/**
- * Copy failed bulk imports.
- */
-public class BulkFailedCopyProcessor implements Processor {
-  
-  private static final Logger log = Logger.getLogger(BulkFailedCopyProcessor.class);
-  
-  @Override
-  public Processor newProcessor() {
-    return new BulkFailedCopyProcessor();
-  }
-  
-  @Override
-  public void process(String workID, byte[] data) {
-    
-    String paths[] = new String(data).split(",");
-    
-    Path orig = new Path(paths[0]);
-    Path dest = new Path(paths[1]);
-    Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp");
-    
-    try {
-      FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
-          ServerConfiguration.getSiteConfiguration()));
-      
-      FileUtil.copy(fs, orig, fs, tmp, false, true, CachedConfiguration.getInstance());
-      fs.rename(tmp, dest);
-      log.debug("copied " + orig + " to " + dest);
-    } catch (IOException ex) {
-      try {
-        FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
-            ServerConfiguration.getSiteConfiguration()));
-        
-        fs.create(dest).close();
-        log.warn(" marked " + dest + " failed", ex);
-      } catch (IOException e) {
-        log.error("Unable to create failure flag file " + dest, e);
-      }
-    }
-
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java
deleted file mode 100644
index f5f21e4..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.AbstractQueue;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class CompactionQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
-  
-  private List<Comparable> task = new LinkedList<Comparable>();
-  
-  @Override
-  public synchronized Runnable poll() {
-    if (task.size() == 0)
-      return null;
-    
-    Comparable min = Collections.min(task);
-    task.remove(min);
-    return (Runnable) min;
-  }
-  
-  @Override
-  public synchronized Runnable peek() {
-    if (task.size() == 0)
-      return null;
-    
-    Comparable min = Collections.min(task);
-    return (Runnable) min;
-  }
-  
-  @Override
-  public synchronized boolean offer(Runnable e) {
-    task.add((Comparable) e);
-    notify();
-    return true;
-  }
-  
-  @Override
-  public synchronized void put(Runnable e) throws InterruptedException {
-    task.add((Comparable) e);
-    notify();
-  }
-  
-  @Override
-  public synchronized boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException {
-    task.add((Comparable) e);
-    notify();
-    return true;
-  }
-  
-  @Override
-  public synchronized Runnable take() throws InterruptedException {
-    while (task.size() == 0) {
-      wait();
-    }
-    
-    return poll();
-  }
-  
-  @Override
-  public synchronized Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
-    if (task.size() == 0) {
-      wait(unit.toMillis(timeout));
-    }
-    
-    if (task.size() == 0)
-      return null;
-    
-    return poll();
-  }
-  
-  @Override
-  public synchronized int remainingCapacity() {
-    return Integer.MAX_VALUE;
-  }
-  
-  @Override
-  public synchronized int drainTo(Collection<? super Runnable> c) {
-    return drainTo(c, task.size());
-  }
-  
-  @Override
-  public synchronized int drainTo(Collection<? super Runnable> c, int maxElements) {
-    Collections.sort(task);
-    
-    int num = Math.min(task.size(), maxElements);
-    
-    Iterator<Comparable> iter = task.iterator();
-    for (int i = 0; i < num; i++) {
-      c.add((Runnable) iter.next());
-      iter.remove();
-    }
-    
-    return num;
-  }
-  
-  @Override
-  public synchronized Iterator<Runnable> iterator() {
-    Collections.sort(task);
-    
-    final Iterator<Comparable> iter = task.iterator();
-    
-    return new Iterator<Runnable>() {
-      
-      @Override
-      public boolean hasNext() {
-        return iter.hasNext();
-      }
-      
-      @Override
-      public Runnable next() {
-        return (Runnable) iter.next();
-      }
-      
-      @Override
-      public void remove() {
-        iter.remove();
-      }
-    };
-  }
-  
-  @Override
-  public synchronized int size() {
-    return task.size();
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java
deleted file mode 100644
index 086c5cd..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-public class CompactionStats {
-  private long entriesRead;
-  private long entriesWritten;
-  private long fileSize;
-  
-  CompactionStats(long er, long ew) {
-    this.setEntriesRead(er);
-    this.setEntriesWritten(ew);
-  }
-  
-  public CompactionStats() {}
-  
-  private void setEntriesRead(long entriesRead) {
-    this.entriesRead = entriesRead;
-  }
-  
-  public long getEntriesRead() {
-    return entriesRead;
-  }
-  
-  private void setEntriesWritten(long entriesWritten) {
-    this.entriesWritten = entriesWritten;
-  }
-  
-  public long getEntriesWritten() {
-    return entriesWritten;
-  }
-  
-  public void add(CompactionStats mcs) {
-    this.entriesRead += mcs.entriesRead;
-    this.entriesWritten += mcs.entriesWritten;
-  }
-  
-  public void setFileSize(long fileSize) {
-    this.fileSize = fileSize;
-  }
-  
-  public long getFileSize() {
-    return this.fileSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java
deleted file mode 100644
index 2ef2c24..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.log4j.Logger;
-
-/**
- * 
- */
-public class CompactionWatcher implements Runnable {
-  private Map<List<Long>,ObservedCompactionInfo> observedCompactions = new HashMap<List<Long>,ObservedCompactionInfo>();
-  private AccumuloConfiguration config;
-  private static boolean watching = false;
-  
-  private static class ObservedCompactionInfo {
-    CompactionInfo compactionInfo;
-    long firstSeen;
-    boolean loggedWarning;
-    
-    ObservedCompactionInfo(CompactionInfo ci, long time) {
-      this.compactionInfo = ci;
-      this.firstSeen = time;
-    }
-  }
-
-  public CompactionWatcher(AccumuloConfiguration config) {
-    this.config = config;
-  }
-
-  public void run() {
-    List<CompactionInfo> runningCompactions = Compactor.getRunningCompactions();
-    
-    Set<List<Long>> newKeys = new HashSet<List<Long>>();
-    
-    long time = System.currentTimeMillis();
-
-    for (CompactionInfo ci : runningCompactions) {
-      List<Long> compactionKey = Arrays.asList(ci.getID(), ci.getEntriesRead(), ci.getEntriesWritten());
-      newKeys.add(compactionKey);
-      
-      if (!observedCompactions.containsKey(compactionKey)) {
-        observedCompactions.put(compactionKey, new ObservedCompactionInfo(ci, time));
-      }
-    }
-    
-    // look for compactions that finished or made progress and logged a warning
-    HashMap<List<Long>,ObservedCompactionInfo> copy = new HashMap<List<Long>,ObservedCompactionInfo>(observedCompactions);
-    copy.keySet().removeAll(newKeys);
-    
-    for (ObservedCompactionInfo oci : copy.values()) {
-      if (oci.loggedWarning) {
-        Logger.getLogger(CompactionWatcher.class).info("Compaction of " + oci.compactionInfo.getExtent() + " is no longer stuck");
-      }
-    }
-
-    // remove any compaction that completed or made progress
-    observedCompactions.keySet().retainAll(newKeys);
-    
-    long warnTime = config.getTimeInMillis(Property.TSERV_COMPACTION_WARN_TIME);
-
-    // check for stuck compactions
-    for (ObservedCompactionInfo oci : observedCompactions.values()) {
-      if (time - oci.firstSeen > warnTime && !oci.loggedWarning) {
-        Thread compactionThread = oci.compactionInfo.getThread();
-        if (compactionThread != null) {
-          StackTraceElement[] trace = compactionThread.getStackTrace();
-          Exception e = new Exception("Possible stack trace of compaction stuck on " + oci.compactionInfo.getExtent());
-          e.setStackTrace(trace);
-          Logger.getLogger(CompactionWatcher.class).warn(
-              "Compaction of " + oci.compactionInfo.getExtent() + " to " + oci.compactionInfo.getOutputFile() + " has not made progress for at least "
-                  + (time - oci.firstSeen) + "ms", e);
-          oci.loggedWarning = true;
-        }
-      }
-    }
-  }
-
-  public static synchronized void startWatching(AccumuloConfiguration config) {
-    if (!watching) {
-      SimpleTimer.getInstance().schedule(new CompactionWatcher(config), 10000, 10000);
-      watching = true;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
deleted file mode 100644
index fb66661..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
+++ /dev/null
@@ -1,546 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.thrift.IterInfo;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.IteratorUtil;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.WrappingIterator;
-import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.system.DeletingIterator;
-import org.apache.accumulo.core.iterators.system.MultiIterator;
-import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
-import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
-import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
-import org.apache.accumulo.core.util.LocalityGroupUtil;
-import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.problems.ProblemReport;
-import org.apache.accumulo.server.problems.ProblemReportingIterator;
-import org.apache.accumulo.server.problems.ProblemReports;
-import org.apache.accumulo.server.problems.ProblemType;
-import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
-import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionReason;
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-
-
-public class Compactor implements Callable<CompactionStats> {
-  
-  public static class CountingIterator extends WrappingIterator {
-    
-    private long count;
-    private ArrayList<CountingIterator> deepCopies;
-    private AtomicLong entriesRead;
-    
-    public CountingIterator deepCopy(IteratorEnvironment env) {
-      return new CountingIterator(this, env);
-    }
-    
-    private CountingIterator(CountingIterator other, IteratorEnvironment env) {
-      setSource(other.getSource().deepCopy(env));
-      count = 0;
-      this.deepCopies = other.deepCopies;
-      this.entriesRead = other.entriesRead;
-      deepCopies.add(this);
-    }
-    
-    public CountingIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong entriesRead) {
-      deepCopies = new ArrayList<Compactor.CountingIterator>();
-      this.setSource(source);
-      count = 0;
-      this.entriesRead = entriesRead;
-    }
-    
-    @Override
-    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) {
-      throw new UnsupportedOperationException();
-    }
-    
-    @Override
-    public void next() throws IOException {
-      super.next();
-      count++;
-      if (count % 1024 == 0) {
-        entriesRead.addAndGet(1024);
-      }
-    }
-    
-    public long getCount() {
-      long sum = 0;
-      for (CountingIterator dc : deepCopies) {
-        sum += dc.count;
-      }
-      
-      return count + sum;
-    }
-  }
-
-  private static final Logger log = Logger.getLogger(Compactor.class);
-  
-  static class CompactionCanceledException extends Exception {
-    private static final long serialVersionUID = 1L;
-  }
-  
-  static interface CompactionEnv {
-    boolean isCompactionEnabled();
-    
-    IteratorScope getIteratorScope();
-  }
-  
-  private Map<FileRef,DataFileValue> filesToCompact;
-  private InMemoryMap imm;
-  private FileRef outputFile;
-  private boolean propogateDeletes;
-  private AccumuloConfiguration acuTableConf;
-  private CompactionEnv env;
-  private Configuration conf;
-  private VolumeManager fs;
-  protected KeyExtent extent;
-  private List<IteratorSetting> iterators;
-  
-  // things to report
-  private String currentLocalityGroup = "";
-  private long startTime;
-
-  private MajorCompactionReason reason;
-  protected MinorCompactionReason mincReason;
-  
-  private AtomicLong entriesRead = new AtomicLong(0);
-  private AtomicLong entriesWritten = new AtomicLong(0);
-  private DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
-  
-  private static AtomicLong nextCompactorID = new AtomicLong(0);
-  
-  // a unique id to identify a compactor
-  private long compactorID = nextCompactorID.getAndIncrement();
-
-  protected volatile Thread thread;
-
-  private synchronized void setLocalityGroup(String name) {
-    this.currentLocalityGroup = name;
-  }
-
-  private void clearStats() {
-    entriesRead.set(0);
-    entriesWritten.set(0);
-  }
-
-  protected static Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>());
-  
-  public static class CompactionInfo {
-    
-    private Compactor compactor;
-    private String localityGroup;
-    private long entriesRead;
-    private long entriesWritten;
-    
-    CompactionInfo(Compactor compactor) {
-      this.localityGroup = compactor.currentLocalityGroup;
-      this.entriesRead = compactor.entriesRead.get();
-      this.entriesWritten = compactor.entriesWritten.get();
-      this.compactor = compactor;
-    }
-
-    public long getID() {
-      return compactor.compactorID;
-    }
-    
-    public KeyExtent getExtent() {
-      return compactor.getExtent();
-    }
-    
-    public long getEntriesRead() {
-      return entriesRead;
-    }
-    
-    public long getEntriesWritten() {
-      return entriesWritten;
-    }
-
-    public Thread getThread() {
-      return compactor.thread;
-    }
-
-    public String getOutputFile() {
-      return compactor.getOutputFile();
-    }
-
-    public ActiveCompaction toThrift() {
-      
-      CompactionType type;
-      
-      if (compactor.imm != null)
-        if (compactor.filesToCompact.size() > 0)
-          type = CompactionType.MERGE;
-        else
-          type = CompactionType.MINOR;
-      else if (!compactor.propogateDeletes)
-        type = CompactionType.FULL;
-      else
-        type = CompactionType.MAJOR;
-      
-      CompactionReason reason;
-      
-      if (compactor.imm != null)
-        switch(compactor.mincReason){
-          case USER:
-            reason = CompactionReason.USER;
-            break;
-          case CLOSE:
-            reason = CompactionReason.CLOSE;
-            break;
-          case SYSTEM:
-          default:
-            reason = CompactionReason.SYSTEM;
-            break;
-        }
-      else
-        switch (compactor.reason) {
-          case USER:
-            reason = CompactionReason.USER;
-            break;
-          case CHOP:
-            reason = CompactionReason.CHOP;
-            break;
-          case IDLE:
-            reason = CompactionReason.IDLE;
-            break;
-          case NORMAL:
-          default:
-            reason = CompactionReason.SYSTEM;
-            break;
-        }
-      
-      List<IterInfo> iiList = new ArrayList<IterInfo>();
-      Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>();
-      
-      for (IteratorSetting iterSetting : compactor.iterators) {
-        iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
-        iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
-      }
-      List<String> filesToCompact = new ArrayList<String>();
-      for (FileRef ref : compactor.filesToCompact.keySet())
-        filesToCompact.add(ref.toString());
-      return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, filesToCompact, compactor.outputFile.toString(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
-    }
-  }
-  
-  public static List<CompactionInfo> getRunningCompactions() {
-    ArrayList<CompactionInfo> compactions = new ArrayList<Compactor.CompactionInfo>();
-    
-    synchronized (runningCompactions) {
-      for (Compactor compactor : runningCompactions) {
-        compactions.add(new CompactionInfo(compactor));
-      }
-    }
-    
-    return compactions;
-  }
-
-  Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
-      AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) {
-    this.extent = extent;
-    this.conf = conf;
-    this.fs = fs;
-    this.filesToCompact = files;
-    this.imm = imm;
-    this.outputFile = outputFile;
-    this.propogateDeletes = propogateDeletes;
-    this.acuTableConf = acuTableConf;
-    this.env = env;
-    this.iterators = iterators;
-    this.reason = reason;
-    
-    startTime = System.currentTimeMillis();
-  }
-  
-  Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
-      AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
-    this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null);
-  }
-  
-  public VolumeManager getFileSystem() {
-    return fs;
-  }
-  
-  KeyExtent getExtent() {
-    return extent;
-  }
-  
-  String getOutputFile() {
-    return outputFile.toString();
-  }
-  
-  @Override
-  public CompactionStats call() throws IOException, CompactionCanceledException {
-    
-    FileSKVWriter mfw = null;
-    
-    CompactionStats majCStats = new CompactionStats();
-
-    boolean remove = runningCompactions.add(this);
-    
-    clearStats();
-
-    String oldThreadName = Thread.currentThread().getName();
-    String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile;
-    Thread.currentThread().setName(newThreadName);
-    thread = Thread.currentThread();
-    try {
-      FileOperations fileFactory = FileOperations.getInstance();
-      FileSystem ns = this.fs.getFileSystemByPath(outputFile.path());
-      mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf);
-      
-      Map<String,Set<ByteSequence>> lGroups;
-      try {
-        lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf);
-      } catch (LocalityGroupConfigurationError e) {
-        throw new IOException(e);
-      }
-      
-      long t1 = System.currentTimeMillis();
-      
-      HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>();
-      
-      if (mfw.supportsLocalityGroups()) {
-        for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
-          setLocalityGroup(entry.getKey());
-          compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats);
-          allColumnFamilies.addAll(entry.getValue());
-        }
-      }
-      
-      setLocalityGroup("");
-      compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats);
-      
-      long t2 = System.currentTimeMillis();
-      
-      FileSKVWriter mfwTmp = mfw;
-      mfw = null; // set this to null so we do not try to close it again in finally if the close fails
-      mfwTmp.close(); // if the close fails it will cause the compaction to fail
-      
-      // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
-      try {
-        FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf);
-        openReader.close();
-      } catch (IOException ex) {
-        log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
-        throw ex;
-      }
-      
-      log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
-          majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
-      
-      majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf));
-      return majCStats;
-    } catch (IOException e) {
-      log.error(e, e);
-      throw e;
-    } catch (RuntimeException e) {
-      log.error(e, e);
-      throw e;
-    } finally {
-      Thread.currentThread().setName(oldThreadName);
-      if (remove) {
-        thread = null;
-        runningCompactions.remove(this);
-      }
-
-      try {
-        if (mfw != null) {
-          // compaction must not have finished successfully, so close its output file
-          try {
-            mfw.close();
-          } finally {
-            if (!fs.deleteRecursively(outputFile.path()))
-              if (fs.exists(outputFile.path()))
-                log.error("Unable to delete " + outputFile);
-          }
-        }
-      } catch (IOException e) {
-        log.warn(e, e);
-      }
-    }
-  }
-
-  private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException {
-    
-    List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
-    
-    for (FileRef mapFile : filesToCompact.keySet()) {
-      try {
-        
-        FileOperations fileFactory = FileOperations.getInstance();
-        FileSystem fs = this.fs.getFileSystemByPath(mapFile.path());
-        FileSKVIterator reader;
-        
-        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, conf, acuTableConf);
-        
-        readers.add(reader);
-        
-        SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader);
-        
-        if (filesToCompact.get(mapFile).isTimeSet()) {
-          iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
-        }
-        
-        iters.add(iter);
-        
-      } catch (Throwable e) {
-        
-        ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e));
-        
-        log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
-        // failed to open some map file... close the ones that were opened
-        for (FileSKVIterator reader : readers) {
-          try {
-            reader.close();
-          } catch (Throwable e2) {
-            log.warn("Failed to close map file", e2);
-          }
-        }
-        
-        readers.clear();
-        
-        if (e instanceof IOException)
-          throw (IOException) e;
-        throw new IOException("Failed to open map data files", e);
-      }
-    }
-    
-    return iters;
-  }
-  
-  private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats)
-      throws IOException, CompactionCanceledException {
-    ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size());
-    Span span = Trace.start("compact");
-    try {
-      long entriesCompacted = 0;
-      List<SortedKeyValueIterator<Key,Value>> iters = openMapDataFiles(lgName, readers);
-      
-      if (imm != null) {
-        iters.add(imm.compactionIterator());
-      }
-      
-      CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange()), entriesRead);
-      DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes);
-      ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
-      
-
-      // if(env.getIteratorScope() )
-      
-      TabletIteratorEnvironment iterEnv;
-      if (env.getIteratorScope() == IteratorScope.majc)
-        iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf);
-      else if (env.getIteratorScope() == IteratorScope.minc)
-        iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf);
-      else
-        throw new IllegalArgumentException();
-      
-      SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf,
-          iterators, iterEnv));
-      
-      itr.seek(extent.toDataRange(), columnFamilies, inclusive);
-      
-      if (!inclusive) {
-        mfw.startDefaultLocalityGroup();
-      } else {
-        mfw.startNewLocalityGroup(lgName, columnFamilies);
-      }
-      
-      Span write = Trace.start("write");
-      try {
-        while (itr.hasTop() && env.isCompactionEnabled()) {
-          mfw.append(itr.getTopKey(), itr.getTopValue());
-          itr.next();
-          entriesCompacted++;
-          
-          if (entriesCompacted % 1024 == 0) {
-            // Periodically update stats, do not want to do this too often since its volatile
-            entriesWritten.addAndGet(1024);
-          }
-        }
-
-        if (itr.hasTop() && !env.isCompactionEnabled()) {
-          // cancel major compaction operation
-          try {
-            try {
-              mfw.close();
-            } catch (IOException e) {
-              log.error(e, e);
-            }
-            fs.deleteRecursively(outputFile.path());
-          } catch (Exception e) {
-            log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
-          }
-          throw new CompactionCanceledException();
-        }
-        
-      } finally {
-        CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted);
-        majCStats.add(lgMajcStats);
-        write.stop();
-      }
-      
-    } finally {
-      // close sequence files opened
-      for (FileSKVIterator reader : readers) {
-        try {
-          reader.close();
-        } catch (Throwable e) {
-          log.warn("Failed to close map file", e);
-        }
-      }
-      span.stop();
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
deleted file mode 100644
index ffa6b77..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.server.data.ServerConditionalMutation;
-import org.apache.hadoop.io.WritableComparator;
-
-/**
- * 
- */
-public class ConditionalMutationSet {
-
-  static interface DeferFilter {
-    void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred);
-  }
-  
-  static class DuplicateFitler implements DeferFilter {
-    public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
-      okMutations.add(scml.get(0));
-      for (int i = 1; i < scml.size(); i++) {
-        if (Arrays.equals(scml.get(i - 1).getRow(), scml.get(i).getRow())) {
-          deferred.add(scml.get(i));
-        } else {
-          okMutations.add(scml.get(i));
-        }
-      }
-    }
-  }
-  
-  static void defer(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferredMutations, DeferFilter filter) {
-    for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
-      List<ServerConditionalMutation> scml = entry.getValue();
-      List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(scml.size());
-      List<ServerConditionalMutation> deferred = new ArrayList<ServerConditionalMutation>();
-      filter.defer(scml, okMutations, deferred);
-      
-      if (deferred.size() > 0) {
-        scml.clear();
-        scml.addAll(okMutations);
-        List<ServerConditionalMutation> l = deferredMutations.get(entry.getKey());
-        if (l == null) {
-          l = deferred;
-          deferredMutations.put(entry.getKey(), l);
-        } else {
-          l.addAll(deferred);
-        }
-
-      }
-    }
-  }
-  
-  static void deferDuplicatesRows(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
-    defer(updates, deferred, new DuplicateFitler());
-  }
-
-  static void sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates) {
-    for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
-      Collections.sort(entry.getValue(), new Comparator<ServerConditionalMutation>() {
-        @Override
-        public int compare(ServerConditionalMutation o1, ServerConditionalMutation o2) {
-          return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
-        }
-      });
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java
deleted file mode 100644
index 65fa538..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-public class EndOfTableException extends Exception {
-  
-  /**
-	 * 
-	 */
-  private static final long serialVersionUID = 1L;
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
deleted file mode 100644
index b73542e..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Semaphore;
-
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
-import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
-import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
-import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.problems.ProblemReport;
-import org.apache.accumulo.server.problems.ProblemReportingIterator;
-import org.apache.accumulo.server.problems.ProblemReports;
-import org.apache.accumulo.server.problems.ProblemType;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-public class FileManager {
-  
-  private static final Logger log = Logger.getLogger(FileManager.class);
-  
-  int maxOpen;
-  
-  private static class OpenReader implements Comparable<OpenReader> {
-    long releaseTime;
-    FileSKVIterator reader;
-    String fileName;
-    
-    public OpenReader(String fileName, FileSKVIterator reader) {
-      this.fileName = fileName;
-      this.reader = reader;
-      this.releaseTime = System.currentTimeMillis();
-    }
-    
-    @Override
-    public int compareTo(OpenReader o) {
-      if (releaseTime < o.releaseTime) {
-        return -1;
-      } else if (releaseTime > o.releaseTime) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-    
-    @Override
-    public boolean equals(Object obj) {
-      if (obj instanceof OpenReader) {
-        return compareTo((OpenReader) obj) == 0;
-      }
-      return false;
-    }
-    
-    @Override
-    public int hashCode() {
-      return fileName.hashCode();
-    }
-  }
-  
-  private Map<String,List<OpenReader>> openFiles;
-  private HashMap<FileSKVIterator,String> reservedReaders;
-  
-  private Semaphore filePermits;
-  
-  private VolumeManager fs;
-  
-  // the data cache and index cache are allocated in
-  // TabletResourceManager and passed through the file opener to
-  // CachableBlockFile which can handle the caches being
-  // null if unallocated
-  private BlockCache dataCache = null;
-  private BlockCache indexCache = null;
-  
-  private long maxIdleTime;
-  
-  private final ServerConfiguration conf;
-  
-  private class IdleFileCloser implements Runnable {
-    
-    @Override
-    public void run() {
-      
-      long curTime = System.currentTimeMillis();
-      
-      ArrayList<FileSKVIterator> filesToClose = new ArrayList<FileSKVIterator>();
-      
-      // determine which files to close in a sync block, and then close the
-      // files outside of the sync block
-      synchronized (FileManager.this) {
-        Iterator<Entry<String,List<OpenReader>>> iter = openFiles.entrySet().iterator();
-        while (iter.hasNext()) {
-          Entry<String,List<OpenReader>> entry = iter.next();
-          List<OpenReader> ofl = entry.getValue();
-          
-          for (Iterator<OpenReader> oflIter = ofl.iterator(); oflIter.hasNext();) {
-            OpenReader openReader = oflIter.next();
-            
-            if (curTime - openReader.releaseTime > maxIdleTime) {
-              
-              filesToClose.add(openReader.reader);
-              oflIter.remove();
-            }
-          }
-          
-          if (ofl.size() == 0) {
-            iter.remove();
-          }
-        }
-      }
-      
-      closeReaders(filesToClose);
-      
-    }
-    
-  }
-  
-  /**
-   * 
-   * @param dataCache
-   *          : underlying file can and should be able to handle a null cache
-   * @param indexCache
-   *          : underlying file can and should be able to handle a null cache
-   */
-  FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
-    
-    if (maxOpen <= 0)
-      throw new IllegalArgumentException("maxOpen <= 0");
-    this.conf = conf;
-    this.dataCache = dataCache;
-    this.indexCache = indexCache;
-    
-    this.filePermits = new Semaphore(maxOpen, true);
-    this.maxOpen = maxOpen;
-    this.fs = fs;
-    
-    this.openFiles = new HashMap<String,List<OpenReader>>();
-    this.reservedReaders = new HashMap<FileSKVIterator,String>();
-    
-    this.maxIdleTime = conf.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
-    SimpleTimer.getInstance().schedule(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2);
-    
-  }
-  
-  private static int countReaders(Map<String,List<OpenReader>> files) {
-    int count = 0;
-    
-    for (List<OpenReader> list : files.values()) {
-      count += list.size();
-    }
-    
-    return count;
-  }
-  
-  private List<FileSKVIterator> takeLRUOpenFiles(int numToTake) {
-    
-    ArrayList<OpenReader> openReaders = new ArrayList<OpenReader>();
-    
-    for (Entry<String,List<OpenReader>> entry : openFiles.entrySet()) {
-      openReaders.addAll(entry.getValue());
-    }
-    
-    Collections.sort(openReaders);
-    
-    ArrayList<FileSKVIterator> ret = new ArrayList<FileSKVIterator>();
-    
-    for (int i = 0; i < numToTake; i++) {
-      OpenReader or = openReaders.get(i);
-      
-      List<OpenReader> ofl = openFiles.get(or.fileName);
-      if (!ofl.remove(or)) {
-        throw new RuntimeException("Failed to remove open reader that should have been there");
-      }
-      
-      if (ofl.size() == 0) {
-        openFiles.remove(or.fileName);
-      }
-      
-      ret.add(or.reader);
-    }
-    
-    return ret;
-  }
-  
-  private static <T> List<T> getFileList(String file, Map<String,List<T>> files) {
-    List<T> ofl = files.get(file);
-    if (ofl == null) {
-      ofl = new ArrayList<T>();
-      files.put(file, ofl);
-    }
-    
-    return ofl;
-  }
-  
-  private void closeReaders(List<FileSKVIterator> filesToClose) {
-    for (FileSKVIterator reader : filesToClose) {
-      try {
-        reader.close();
-      } catch (Exception e) {
-        log.error("Failed to close file " + e.getMessage(), e);
-      }
-    }
-  }
-  
-  private List<String> takeOpenFiles(Collection<String> files, List<FileSKVIterator> reservedFiles, Map<FileSKVIterator,String> readersReserved) {
-    List<String> filesToOpen = new LinkedList<String>(files);
-    for (Iterator<String> iterator = filesToOpen.iterator(); iterator.hasNext();) {
-      String file = iterator.next();
-      
-      List<OpenReader> ofl = openFiles.get(file);
-      if (ofl != null && ofl.size() > 0) {
-        OpenReader openReader = ofl.remove(ofl.size() - 1);
-        reservedFiles.add(openReader.reader);
-        readersReserved.put(openReader.reader, file);
-        if (ofl.size() == 0) {
-          openFiles.remove(file);
-        }
-        iterator.remove();
-      }
-      
-    }
-    return filesToOpen;
-  }
-  
-  private synchronized String getReservedReadeFilename(FileSKVIterator reader) {
-    return reservedReaders.get(reader);
-  }
-  
-  private List<FileSKVIterator> reserveReaders(Text table, Collection<String> files, boolean continueOnFailure) throws IOException {
-    
-    if (files.size() >= maxOpen) {
-      throw new IllegalArgumentException("requested files exceeds max open");
-    }
-    
-    if (files.size() == 0) {
-      return Collections.emptyList();
-    }
-    
-    List<String> filesToOpen = null;
-    List<FileSKVIterator> filesToClose = Collections.emptyList();
-    List<FileSKVIterator> reservedFiles = new ArrayList<FileSKVIterator>();
-    Map<FileSKVIterator,String> readersReserved = new HashMap<FileSKVIterator,String>();
-    
-    filePermits.acquireUninterruptibly(files.size());
-    
-    // now that the we are past the semaphore, we have the authority
-    // to open files.size() files
-    
-    // determine what work needs to be done in sync block
-    // but do the work of opening and closing files outside
-    // a synch block
-    synchronized (this) {
-      
-      filesToOpen = takeOpenFiles(files, reservedFiles, readersReserved);
-      
-      int numOpen = countReaders(openFiles);
-      
-      if (filesToOpen.size() + numOpen + reservedReaders.size() > maxOpen) {
-        filesToClose = takeLRUOpenFiles((filesToOpen.size() + numOpen + reservedReaders.size()) - maxOpen);
-      }
-    }
-    
-    // close files before opening files to ensure we stay under resource
-    // limitations
-    closeReaders(filesToClose);
-    
-    // open any files that need to be opened
-    for (String file : filesToOpen) {
-      try {
-        if (!file.contains(":"))
-          throw new IllegalArgumentException("Expected uri, got : " + file);
-        Path path = new Path(file);
-        FileSystem ns = fs.getFileSystemByPath(path);
-        //log.debug("Opening "+file + " path " + path);
-        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
-            dataCache, indexCache);
-        reservedFiles.add(reader);
-        readersReserved.put(reader, file);
-      } catch (Exception e) {
-        
-        ProblemReports.getInstance().report(new ProblemReport(table.toString(), ProblemType.FILE_READ, file, e));
-        
-        if (continueOnFailure) {
-          // release the permit for the file that failed to open
-          filePermits.release(1);
-          log.warn("Failed to open file " + file + " " + e.getMessage() + " continuing...");
-        } else {
-          // close whatever files were opened
-          closeReaders(reservedFiles);
-          
-          filePermits.release(files.size());
-          
-          log.error("Failed to open file " + file + " " + e.getMessage());
-          throw new IOException("Failed to open " + file, e);
-        }
-      }
-    }
-    
-    synchronized (this) {
-      // update set of reserved readers
-      reservedReaders.putAll(readersReserved);
-    }
-    
-    return reservedFiles;
-  }
-  
-  private void releaseReaders(List<FileSKVIterator> readers, boolean sawIOException) {
-    // put files in openFiles
-    
-    synchronized (this) {
-      
-      // check that readers were actually reserved ... want to make sure a thread does
-      // not try to release readers they never reserved
-      if (!reservedReaders.keySet().containsAll(readers)) {
-        throw new IllegalArgumentException("Asked to release readers that were never reserved ");
-      }
-      
-      for (FileSKVIterator reader : readers) {
-        try {
-          reader.closeDeepCopies();
-        } catch (IOException e) {
-          log.warn(e, e);
-          sawIOException = true;
-        }
-      }
-      
-      for (FileSKVIterator reader : readers) {
-        String fileName = reservedReaders.remove(reader);
-        if (!sawIOException)
-          getFileList(fileName, openFiles).add(new OpenReader(fileName, reader));
-      }
-    }
-    
-    if (sawIOException)
-      closeReaders(readers);
-    
-    // decrement the semaphore
-    filePermits.release(readers.size());
-    
-  }
-  
-  static class FileDataSource implements DataSource {
-    
-    private SortedKeyValueIterator<Key,Value> iter;
-    private ArrayList<FileDataSource> deepCopies;
-    private boolean current = true;
-    private IteratorEnvironment env;
-    private String file;
-    
-    FileDataSource(String file, SortedKeyValueIterator<Key,Value> iter) {
-      this.file = file;
-      this.iter = iter;
-      this.deepCopies = new ArrayList<FileManager.FileDataSource>();
-    }
-    
-    public FileDataSource(IteratorEnvironment env, SortedKeyValueIterator<Key,Value> deepCopy, ArrayList<FileDataSource> deepCopies) {
-      this.iter = deepCopy;
-      this.env = env;
-      this.deepCopies = deepCopies;
-      deepCopies.add(this);
-    }
-    
-    @Override
-    public boolean isCurrent() {
-      return current;
-    }
-    
-    @Override
-    public DataSource getNewDataSource() {
-      current = true;
-      return this;
-    }
-    
-    @Override
-    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
-      return new FileDataSource(env, iter.deepCopy(env), deepCopies);
-    }
-    
-    @Override
-    public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
-      return iter;
-    }
-    
-    void unsetIterator() {
-      current = false;
-      iter = null;
-      for (FileDataSource fds : deepCopies) {
-        fds.current = false;
-        fds.iter = null;
-      }
-    }
-    
-    void setIterator(SortedKeyValueIterator<Key,Value> iter) {
-      current = false;
-      this.iter = iter;
-      for (FileDataSource fds : deepCopies) {
-        fds.current = false;
-        fds.iter = iter.deepCopy(fds.env);
-      }
-    }
-    
-  }
-  
-  public class ScanFileManager {
-    
-    private ArrayList<FileDataSource> dataSources;
-    private ArrayList<FileSKVIterator> tabletReservedReaders;
-    private KeyExtent tablet;
-    private boolean continueOnFailure;
-    
-    ScanFileManager(KeyExtent tablet) {
-      tabletReservedReaders = new ArrayList<FileSKVIterator>();
-      dataSources = new ArrayList<FileDataSource>();
-      this.tablet = tablet;
-      
-      continueOnFailure = conf.getTableConfiguration(tablet).getBoolean(Property.TABLE_FAILURES_IGNORE);
-      
-      if (tablet.isMeta()) {
-        continueOnFailure = false;
-      }
-    }
-    
-    private List<FileSKVIterator> openFileRefs(Collection<FileRef> files) throws TooManyFilesException, IOException {
-      List<String> strings = new ArrayList<String>(files.size());
-      for (FileRef ref : files)
-        strings.add(ref.path().toString());
-      return openFiles(strings);
-    }
-    
-    private List<FileSKVIterator> openFiles(Collection<String> files) throws TooManyFilesException, IOException {
-      // one tablet can not open more than maxOpen files, otherwise it could get stuck
-      // forever waiting on itself to release files
-      
-      if (tabletReservedReaders.size() + files.size() >= maxOpen) {
-        throw new TooManyFilesException("Request to open files would exceed max open files reservedReaders.size()=" + tabletReservedReaders.size()
-            + " files.size()=" + files.size() + " maxOpen=" + maxOpen + " tablet = " + tablet);
-      }
-      
-      List<FileSKVIterator> newlyReservedReaders = reserveReaders(tablet.getTableId(), files, continueOnFailure);
-      
-      tabletReservedReaders.addAll(newlyReservedReaders);
-      return newlyReservedReaders;
-    }
-    
-    synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
-      
-      List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet());
-      
-      ArrayList<InterruptibleIterator> iters = new ArrayList<InterruptibleIterator>();
-      
-      for (FileSKVIterator reader : newlyReservedReaders) {
-        String filename = getReservedReadeFilename(reader);
-        InterruptibleIterator iter;
-        if (detachable) {
-          FileDataSource fds = new FileDataSource(filename, reader);
-          dataSources.add(fds);
-          SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds);
-          iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, ssi);
-        } else {
-          iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, reader);
-        }
-        DataFileValue value = files.get(new FileRef(filename));
-        if (value.isTimeSet()) {
-          iter = new TimeSettingIterator(iter, value.getTime());
-        }
-        
-        iters.add(iter);
-      }
-      
-      return iters;
-    }
-    
-    synchronized void detach() {
-      
-      releaseReaders(tabletReservedReaders, false);
-      tabletReservedReaders.clear();
-      
-      for (FileDataSource fds : dataSources)
-        fds.unsetIterator();
-    }
-    
-    synchronized void reattach() throws IOException {
-      if (tabletReservedReaders.size() != 0)
-        throw new IllegalStateException();
-      
-      Collection<String> files = new ArrayList<String>();
-      for (FileDataSource fds : dataSources)
-        files.add(fds.file);
-      
-      List<FileSKVIterator> newlyReservedReaders = openFiles(files);
-      Map<String,List<FileSKVIterator>> map = new HashMap<String,List<FileSKVIterator>>();
-      for (FileSKVIterator reader : newlyReservedReaders) {
-        String fileName = getReservedReadeFilename(reader);
-        List<FileSKVIterator> list = map.get(fileName);
-        if (list == null) {
-          list = new LinkedList<FileSKVIterator>();
-          map.put(fileName, list);
-        }
-        
-        list.add(reader);
-      }
-      
-      for (FileDataSource fds : dataSources) {
-        FileSKVIterator reader = map.get(fds.file).remove(0);
-        fds.setIterator(reader);
-      }
-    }
-    
-    synchronized void releaseOpenFiles(boolean sawIOException) {
-      releaseReaders(tabletReservedReaders, sawIOException);
-      tabletReservedReaders.clear();
-      dataSources.clear();
-    }
-    
-    synchronized int getNumOpenFiles() {
-      return tabletReservedReaders.size();
-    }
-  }
-  
-  public ScanFileManager newScanFileManager(KeyExtent tablet) {
-    return new ScanFileManager(tablet);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/HoldTimeoutException.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/HoldTimeoutException.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/HoldTimeoutException.java
deleted file mode 100644
index a14db23..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/HoldTimeoutException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-public class HoldTimeoutException extends RuntimeException {
-  private static final long serialVersionUID = 1L;
-  
-  public HoldTimeoutException(String why) {
-    super(why);
-  }
-}


Mime
View raw message