accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [40/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules
Date Fri, 01 Nov 2013 00:56:19 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
new file mode 100644
index 0000000..66e68c3
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Provides a way to push work out to tablet servers via zookeeper and wait for that work to be done. Any tablet server can pick up a work item and process it.
+ * 
+ * Worker processes watch a zookeeper node for tasks to be performed. After getting an exclusive lock on the node, the worker will perform the task.
+ */
+public class DistributedWorkQueue {
+  
+  private static final String LOCKS_NODE = "locks";
+
+  private static final Logger log = Logger.getLogger(DistributedWorkQueue.class);
+  
+  private ThreadPoolExecutor threadPool;
+  private ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+  private String path;
+
+  private AtomicInteger numTask = new AtomicInteger(0);
+
+  private void lookForWork(final Processor processor, List<String> children) {
+    if (children.size() == 0)
+      return;
+    
+    if (numTask.get() >= threadPool.getCorePoolSize())
+      return;
+    
+    Random random = new Random();
+    Collections.shuffle(children, random);
+    try {
+      for (final String child : children) {
+        
+        if (child.equals(LOCKS_NODE))
+          continue;
+
+        final String lockPath = path + "/locks/" + child;
+
+        try {
+          // no need to use zoolock, because a queue (ephemeral sequential) is not needed
+          // if can not get the lock right now then do not want to wait
+          zoo.putEphemeralData(lockPath, new byte[0]);
+        } catch (NodeExistsException nee) {
+          // someone else has reserved it
+          continue;
+        }
+
+        final String childPath = path + "/" + child;
+        
+        // check to see if another node processed it already
+        if (!zoo.exists(childPath)) {
+          zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
+          continue;
+        }
+
+        // Great... we got the lock, but maybe we're too busy
+        if (numTask.get() >= threadPool.getCorePoolSize()) {
+          zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
+          break;
+        }
+        
+        log.debug("got lock for " + child);
+        
+        Runnable task = new Runnable() {
+
+          @Override
+          public void run() {
+            try {
+              try {
+                processor.newProcessor().process(child, zoo.getData(childPath, null));
+                
+                // if the task fails, then its entry in the Q is not deleted... so it will be retried
+                try {
+                  zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
+                } catch (Exception e) {
+                  log.error("Error received when trying to delete entry in zookeeper " + childPath, e);
+                }
+                
+              } catch (Exception e) {
+                log.warn("Failed to process work " + child, e);
+              }
+              
+              try {
+                zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
+              } catch (Exception e) {
+                log.error("Error received when trying to delete entry in zookeeper " + childPath, e);
+              }
+
+            } finally {
+              numTask.decrementAndGet();
+            }
+            
+            try {
+              // its important that this is called after numTask is decremented
+              lookForWork(processor, zoo.getChildren(path));
+            } catch (KeeperException e) {
+              log.error("Failed to look for work", e);
+            } catch (InterruptedException e) {
+              log.info("Interrupted looking for work", e);
+            }
+          }
+        };
+        
+        numTask.incrementAndGet();
+        threadPool.execute(task);
+
+      }
+    } catch (Throwable t) {
+      log.error("Unexpected error", t);
+    }
+  }
+
+  public interface Processor {
+    Processor newProcessor();
+
+    void process(String workID, byte[] data);
+  }
+  
+  public DistributedWorkQueue(String path) {
+    this.path = path;
+  }
+  
+  public void startProcessing(final Processor processor, ThreadPoolExecutor executorService) throws KeeperException, InterruptedException {
+    
+    threadPool = (ThreadPoolExecutor) executorService;
+
+    zoo.mkdirs(path);
+    zoo.mkdirs(path + "/" + LOCKS_NODE);
+
+    List<String> children = zoo.getChildren(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        switch (event.getType()) {
+          case NodeChildrenChanged:
+            if (event.getPath().equals(path))
+              try {
+                lookForWork(processor, zoo.getChildren(path, this));
+              } catch (KeeperException e) {
+                log.error("Failed to look for work", e);
+              } catch (InterruptedException e) {
+                log.info("Interrupted looking for work", e);
+              }
+            else
+              log.info("Unexpected path for NodeChildrenChanged event " + event.getPath());
+            break;
+          case NodeCreated:
+          case NodeDataChanged:
+          case NodeDeleted:
+          case None:
+            log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
+            break;
+        
+        }
+      }
+    });
+    
+    lookForWork(processor, children);
+    
+    Random r = new Random();
+    // Add a little jitter to avoid all the tservers slamming zookeeper at once
+    SimpleTimer.getInstance().schedule(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          lookForWork(processor, zoo.getChildren(path));
+        } catch (KeeperException e) {
+          log.error("Failed to look for work", e);
+        } catch (InterruptedException e) {
+          log.info("Interrupted looking for work", e);
+        }
+      }
+    }, r.nextInt(60 * 1000), 60 * 1000);
+  }
+  
+  public void addWork(String workId, byte[] data) throws KeeperException, InterruptedException {
+    if (workId.equalsIgnoreCase(LOCKS_NODE))
+      throw new IllegalArgumentException("locks is reserved work id");
+
+    zoo.mkdirs(path);
+    zoo.putPersistentData(path + "/" + workId, data, NodeExistsPolicy.SKIP);
+  }
+  
+  public List<String> getWorkQueued() throws KeeperException, InterruptedException {
+    ArrayList<String> children = new ArrayList<String>(zoo.getChildren(path));
+    children.remove(LOCKS_NODE);
+    return children;
+  }
+
+  public void waitUntilDone(Set<String> workIDs) throws KeeperException, InterruptedException {
+    
+    final String condVar = new String("cond");
+    
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        switch (event.getType()) {
+          case NodeChildrenChanged:
+            synchronized (condVar) {
+              condVar.notify();
+            }
+            break;
+          case NodeCreated:
+          case NodeDataChanged:
+          case NodeDeleted:
+          case None:
+            log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
+            break;
+        
+        }
+      }
+    };
+    
+    List<String> children = zoo.getChildren(path, watcher);
+    
+    while (!Collections.disjoint(children, workIDs)) {
+      synchronized (condVar) {
+        condVar.wait(10000);
+      }
+      children = zoo.getChildren(path, watcher);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
new file mode 100644
index 0000000..4e0e977
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.zookeeper;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.zookeeper.KeeperException;
+
+public class TransactionWatcher extends org.apache.accumulo.fate.zookeeper.TransactionWatcher {
+  public static class ZooArbitrator implements Arbitrator {
+    
+    Instance instance = HdfsZooInstance.getInstance();
+    ZooReader rdr = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+    
+    @Override
+    public boolean transactionAlive(String type, long tid) throws Exception {
+      String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid;
+      rdr.sync(path);
+      return rdr.exists(path);
+    }
+    
+    public static void start(String type, long tid) throws KeeperException, InterruptedException {
+      Instance instance = HdfsZooInstance.getInstance();
+      IZooReaderWriter writer = ZooReaderWriter.getInstance();
+      writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type, new byte[] {}, NodeExistsPolicy.OVERWRITE);
+      writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, new byte[] {}, NodeExistsPolicy.OVERWRITE);
+      writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running", new byte[] {}, NodeExistsPolicy.OVERWRITE);
+    }
+    
+    public static void stop(String type, long tid) throws KeeperException, InterruptedException {
+      Instance instance = HdfsZooInstance.getInstance();
+      IZooReaderWriter writer = ZooReaderWriter.getInstance();
+      writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, NodeMissingPolicy.SKIP);
+    }
+    
+    public static void cleanup(String type, long tid) throws KeeperException, InterruptedException {
+      Instance instance = HdfsZooInstance.getInstance();
+      IZooReaderWriter writer = ZooReaderWriter.getInstance();
+      writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, NodeMissingPolicy.SKIP);
+      writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running", NodeMissingPolicy.SKIP);
+    }
+
+    @Override
+    public boolean transactionComplete(String type, long tid) throws Exception {
+      String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running";
+      rdr.sync(path);
+      return !rdr.exists(path);
+    }
+  }
+  
+  public TransactionWatcher() {
+    super(new ZooArbitrator());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooCache.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooCache.java
new file mode 100644
index 0000000..bf34ef6
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooCache.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.zookeeper;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.zookeeper.Watcher;
+
+public class ZooCache extends org.apache.accumulo.fate.zookeeper.ZooCache {
+  public ZooCache() {
+    this(null);
+  }
+  
+  public ZooCache(Watcher watcher) {
+    super(ZooReaderWriter.getInstance(), watcher);
+  }
+  
+  public ZooCache(AccumuloConfiguration conf, Watcher watcher) {
+    super(conf.get(Property.INSTANCE_ZK_HOST), (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), watcher);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
new file mode 100644
index 0000000..dce6d38
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.zookeeper;
+
+import org.apache.zookeeper.KeeperException;
+
+public class ZooLock extends org.apache.accumulo.fate.zookeeper.ZooLock {
+  
+  public ZooLock(String path) {
+    super(new ZooCache(), ZooReaderWriter.getInstance(), path);
+  }
+  
+  public static void deleteLock(String path) throws InterruptedException, KeeperException {
+    deleteLock(ZooReaderWriter.getInstance(), path);
+  }
+  
+  public static boolean deleteLock(String path, String lockData) throws InterruptedException, KeeperException {
+    return deleteLock(ZooReaderWriter.getInstance(), path, lockData);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
new file mode 100644
index 0000000..93a0460
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.zookeeper;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock;
+import org.apache.zookeeper.KeeperException;
+
+public class ZooQueueLock extends org.apache.accumulo.fate.zookeeper.ZooQueueLock {
+  
+  public ZooQueueLock(String path, boolean ephemeral) throws KeeperException, InterruptedException {
+    super(ZooReaderWriter.getRetryingInstance(), path, ephemeral);
+  }
+  
+  public static void main(String args[]) throws InterruptedException, KeeperException {
+    ZooQueueLock lock = new ZooQueueLock("/lock", true);
+    DistributedReadWriteLock rlocker = new DistributedReadWriteLock(lock, "reader".getBytes());
+    DistributedReadWriteLock wlocker = new DistributedReadWriteLock(lock, "wlocker".getBytes());
+    final Lock readLock = rlocker.readLock();
+    readLock.lock();
+    final Lock readLock2 = rlocker.readLock();
+    readLock2.lock();
+    final Lock writeLock = wlocker.writeLock();
+    if (writeLock.tryLock(100, TimeUnit.MILLISECONDS))
+      throw new RuntimeException("Write lock achieved during read lock!");
+    readLock.unlock();
+    readLock2.unlock();
+    writeLock.lock();
+    if (readLock.tryLock(100, TimeUnit.MILLISECONDS))
+      throw new RuntimeException("Read lock achieved during write lock!");
+    final Lock writeLock2 = DistributedReadWriteLock.recoverLock(lock, "wlocker".getBytes());
+    writeLock2.unlock();
+    readLock.lock();
+    System.out.println("success");
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
new file mode 100644
index 0000000..70ba661
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.zookeeper;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+public class ZooReaderWriter extends org.apache.accumulo.fate.zookeeper.ZooReaderWriter {
+  private static final String SCHEME = "digest";
+  private static final String USER = "accumulo";
+  private static ZooReaderWriter instance = null;
+  private static IZooReaderWriter retryingInstance = null;
+  
+  public ZooReaderWriter(String string, int timeInMillis, String secret) {
+    super(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes());
+  }
+  
+  public static synchronized ZooReaderWriter getInstance() {
+    if (instance == null) {
+      AccumuloConfiguration conf = ServerConfiguration.getSiteConfiguration();
+      instance = new ZooReaderWriter(conf.get(Property.INSTANCE_ZK_HOST), (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
+          conf.get(Property.INSTANCE_SECRET));
+    }
+    return instance;
+  }
+  
+  /**
+   * get an instance that retries when zookeeper connection errors occur
+   * 
+   * @return an instance that retries when Zookeeper connection errors occur.
+   */
+  public static synchronized IZooReaderWriter getRetryingInstance() {
+    
+    if (retryingInstance == null) {
+      final IZooReaderWriter inst = getInstance();
+      
+      InvocationHandler ih = new InvocationHandler() {
+        @Override
+        public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
+          long retryTime = 250;
+          while (true) {
+            try {
+              return method.invoke(inst, args);
+            } catch (InvocationTargetException e) {
+              if (e.getCause() instanceof KeeperException.ConnectionLossException) {
+                Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + retryTime, e.getCause());
+                UtilWaitThread.sleep(retryTime);
+                retryTime = Math.min(5000, retryTime + 250);
+              } else {
+                throw e.getCause();
+              }
+            }
+          }
+        }
+      };
+      
+      retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
+    }
+    
+    return retryingInstance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
new file mode 100644
index 0000000..3680341
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BulkImporterTest {
+  
+  static final SortedSet<KeyExtent> fakeMetaData = new TreeSet<KeyExtent>();
+  static final Text tableId = new Text("1");
+  static {
+    fakeMetaData.add(new KeyExtent(tableId, new Text("a"), null));
+    for (String part : new String[] {"b", "bm", "c", "cm", "d", "dm", "e", "em", "f", "g", "h", "i", "j", "k", "l"}) {
+      fakeMetaData.add(new KeyExtent(tableId, new Text(part), fakeMetaData.last().getEndRow()));
+    }
+    fakeMetaData.add(new KeyExtent(tableId, null, fakeMetaData.last().getEndRow()));
+  }
+  
+  class MockTabletLocator extends TabletLocator {
+    int invalidated = 0;
+    
+    @Override
+    public TabletLocation locateTablet(Credentials credentials, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException,
+        TableNotFoundException {
+      return new TabletLocation(fakeMetaData.tailSet(new KeyExtent(tableId, row, null)).first(), "localhost", "1");
+    }
+    
+    @Override
+    public <T extends Mutation> void binMutations(Credentials credentials, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
+        throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public List<Range> binRanges(Credentials credentials, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
+        AccumuloSecurityException, TableNotFoundException {
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public void invalidateCache(KeyExtent failedExtent) {
+      invalidated++;
+    }
+    
+    @Override
+    public void invalidateCache(Collection<KeyExtent> keySet) {
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public void invalidateCache() {
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public void invalidateCache(String server) {
+      throw new NotImplementedException();
+    }
+  }
+  
+  @Test
+  public void testFindOverlappingTablets() throws Exception {
+    Credentials credentials = null;
+    MockTabletLocator locator = new MockTabletLocator();
+    FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
+    AccumuloConfiguration acuConf = AccumuloConfiguration.getDefaultConfiguration();
+    String file = "target/testFile.rf";
+    fs.delete(new Path(file), true);
+    FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), acuConf);
+    writer.startDefaultLocalityGroup();
+    Value empty = new Value(new byte[] {});
+    writer.append(new Key("a", "cf", "cq"), empty);
+    writer.append(new Key("a", "cf", "cq1"), empty);
+    writer.append(new Key("a", "cf", "cq2"), empty);
+    writer.append(new Key("a", "cf", "cq3"), empty);
+    writer.append(new Key("a", "cf", "cq4"), empty);
+    writer.append(new Key("a", "cf", "cq5"), empty);
+    writer.append(new Key("d", "cf", "cq"), empty);
+    writer.append(new Key("d", "cf", "cq1"), empty);
+    writer.append(new Key("d", "cf", "cq2"), empty);
+    writer.append(new Key("d", "cf", "cq3"), empty);
+    writer.append(new Key("d", "cf", "cq4"), empty);
+    writer.append(new Key("d", "cf", "cq5"), empty);
+    writer.append(new Key("dd", "cf", "cq1"), empty);
+    writer.append(new Key("ichabod", "cf", "cq"), empty);
+    writer.append(new Key("icky", "cf", "cq1"), empty);
+    writer.append(new Key("iffy", "cf", "cq2"), empty);
+    writer.append(new Key("internal", "cf", "cq3"), empty);
+    writer.append(new Key("is", "cf", "cq4"), empty);
+    writer.append(new Key("iterator", "cf", "cq5"), empty);
+    writer.append(new Key("xyzzy", "cf", "cq"), empty);
+    writer.close();
+    VolumeManager vm = VolumeManagerImpl.get(acuConf);
+    List<TabletLocation> overlaps = BulkImporter.findOverlappingTablets(acuConf, vm, locator, new Path(file), credentials);
+    Assert.assertEquals(5, overlaps.size());
+    Collections.sort(overlaps);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("a"), null), overlaps.get(0).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps.get(1).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps.get(2).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps.get(3).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, null, new Text("l")), overlaps.get(4).tablet_extent);
+    
+    List<TabletLocation> overlaps2 = BulkImporter.findOverlappingTablets(acuConf, vm, locator, new Path(file), new KeyExtent(tableId, new Text("h"), new Text(
+        "b")), credentials);
+    Assert.assertEquals(3, overlaps2.size());
+    Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps2.get(0).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps2.get(1).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps2.get(2).tablet_extent);
+    Assert.assertEquals(locator.invalidated, 1);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
new file mode 100644
index 0000000..d0b2e9e
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.constraints;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.List;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
+import org.apache.accumulo.server.constraints.MetadataConstraints;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+public class MetadataConstraintsTest {
+  
+  static class TestMetadataConstraints extends MetadataConstraints {
+    @Override
+    protected Arbitrator getArbitrator() {
+      return new Arbitrator() {
+        
+        @Override
+        public boolean transactionAlive(String type, long tid) throws Exception {
+          if (tid == 9)
+            throw new RuntimeException("txid 9 reserved for future use");
+          return tid == 5 || tid == 7;
+        }
+        
+        @Override
+        public boolean transactionComplete(String type, long tid) throws Exception {
+          return tid != 5 && tid != 7;
+        }
+      };
+    }
+  }
+  
+  @Test
+  public void testCheck() {
+    Logger.getLogger(AccumuloConfiguration.class).setLevel(Level.ERROR);
+    Mutation m = new Mutation(new Text("0;foo"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("1foo".getBytes()));
+    
+    MetadataConstraints mc = new MetadataConstraints();
+    
+    List<Short> violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 3), violations.get(0));
+    
+    m = new Mutation(new Text("0:foo"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("1poo".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 4), violations.get(0));
+    
+    m = new Mutation(new Text("0;foo"));
+    m.put(new Text("bad_column_name"), new Text(""), new Value("e".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 2), violations.get(0));
+    
+    m = new Mutation(new Text("!!<"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("1poo".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(2, violations.size());
+    assertEquals(Short.valueOf((short) 4), violations.get(0));
+    assertEquals(Short.valueOf((short) 5), violations.get(1));
+    
+    m = new Mutation(new Text("0;foo"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 6), violations.get(0));
+    
+    m = new Mutation(new Text("0;foo"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("bar".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertEquals(null, violations);
+    
+    m = new Mutation(new Text("!0<"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("bar".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertEquals(null, violations);
+    
+    m = new Mutation(new Text("!1<"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("bar".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 4), violations.get(0));
+    
+  }
+  
+  @Test
+  public void testBulkFileCheck() {
+    MetadataConstraints mc = new TestMetadataConstraints();
+    Mutation m;
+    List<Short> violations;
+    
+    // inactive txid
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("12345".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // txid that throws exception
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("9".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // active txid w/ file
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // active txid w/o file
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // two active txids w/ files
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile2"), new Value("7".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile2"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // two files w/ one active txid
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile2"), new Value("5".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile2"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // two loaded w/ one active txid and one file
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile2"), new Value("5".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // active txid, mutation that looks like split
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // inactive txid, mutation that looks like split
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("12345".getBytes()));
+    TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // active txid, mutation that looks like a load
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("789"), new Value("127.0.0.1:9997".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // inactive txid, mutation that looks like a load
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("12345".getBytes()));
+    m.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("789"), new Value("127.0.0.1:9997".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // deleting a load flag
+    m = new Mutation(new Text("0;foo"));
+    m.putDelete(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java b/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java
new file mode 100644
index 0000000..0df27f1
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.data;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Test;
+
+public class ServerMutationTest {
+  
+  @Test
+  public void test() throws Exception {
+    ServerMutation m = new ServerMutation(new Text("r1"));
+    m.put(new Text("cf1"), new Text("cq1"), new Value("v1".getBytes()));
+    m.put(new Text("cf2"), new Text("cq2"), 56, new Value("v2".getBytes()));
+    m.setSystemTimestamp(42);
+    
+    List<ColumnUpdate> updates = m.getUpdates();
+    
+    assertEquals(2, updates.size());
+    
+    assertEquals("r1", new String(m.getRow()));
+    ColumnUpdate cu = updates.get(0);
+    
+    assertEquals("cf1", new String(cu.getColumnFamily()));
+    assertEquals("cq1", new String(cu.getColumnQualifier()));
+    assertEquals("", new String(cu.getColumnVisibility()));
+    assertFalse(cu.hasTimestamp());
+    assertEquals(42l, cu.getTimestamp());
+    
+    ServerMutation m2 = new ServerMutation();
+    ReflectionUtils.copy(CachedConfiguration.getInstance(), m, m2);
+    
+    updates = m2.getUpdates();
+    
+    assertEquals(2, updates.size());
+    assertEquals("r1", new String(m2.getRow()));
+    
+    cu = updates.get(0);
+    assertEquals("cf1", new String(cu.getColumnFamily()));
+    assertEquals("cq1", new String(cu.getColumnQualifier()));
+    assertFalse(cu.hasTimestamp());
+    assertEquals(42l, cu.getTimestamp());
+    
+    cu = updates.get(1);
+    
+    assertEquals("r1", new String(m2.getRow()));
+    assertEquals("cf2", new String(cu.getColumnFamily()));
+    assertEquals("cq2", new String(cu.getColumnQualifier()));
+    assertTrue(cu.hasTimestamp());
+    assertEquals(56, cu.getTimestamp());
+    
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java b/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java
new file mode 100644
index 0000000..4a45e99
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
+import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class MetadataBulkLoadFilterTest {
+  static class TestArbitrator implements Arbitrator {
+    @Override
+    public boolean transactionAlive(String type, long tid) throws Exception {
+      return tid == 5;
+    }
+    
+    @Override
+    public boolean transactionComplete(String type, long tid) throws Exception {
+      if (tid == 9)
+        throw new RuntimeException();
+      return tid != 5 && tid != 7;
+    }
+  }
+  
+  static class TestMetadataBulkLoadFilter extends MetadataBulkLoadFilter {
+    @Override
+    protected Arbitrator getArbitrator() {
+      return new TestArbitrator();
+    }
+  }
+  
+  private static void put(TreeMap<Key,Value> tm, String row, ColumnFQ cfq, String val) {
+    Key k = new Key(new Text(row), cfq.getColumnFamily(), cfq.getColumnQualifier());
+    tm.put(k, new Value(val.getBytes()));
+  }
+  
+  private static void put(TreeMap<Key,Value> tm, String row, Text cf, String cq, String val) {
+    Key k = new Key(new Text(row), cf, new Text(cq));
+    if (val == null) {
+      k.setDeleted(true);
+      tm.put(k, new Value("".getBytes()));
+    } else
+      tm.put(k, new Value(val.getBytes()));
+  }
+  
+  @Test
+  public void testBasic() throws IOException {
+    TreeMap<Key,Value> tm1 = new TreeMap<Key,Value>();
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    
+    // following should not be deleted by filter
+    put(tm1, "2;m", TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, "/t1");
+    put(tm1, "2;m", DataFileColumnFamily.NAME, "/t1/file1", "1,1");
+    put(tm1, "2;m", TabletsSection.BulkFileColumnFamily.NAME, "/t1/file1", "5");
+    put(tm1, "2;m", TabletsSection.BulkFileColumnFamily.NAME, "/t1/file3", "7");
+    put(tm1, "2;m", TabletsSection.BulkFileColumnFamily.NAME, "/t1/file4", "9");
+    put(tm1, "2<", TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, "/t2");
+    put(tm1, "2<", DataFileColumnFamily.NAME, "/t2/file2", "1,1");
+    put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/file6", "5");
+    put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/file7", "7");
+    put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/file8", "9");
+    put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/fileC", null);
+    
+    expected.putAll(tm1);
+    
+    // the following should be deleted by filter
+    put(tm1, "2;m", TabletsSection.BulkFileColumnFamily.NAME, "/t1/file5", "8");
+    put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/file9", "8");
+    put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/fileA", "2");
+    
+    TestMetadataBulkLoadFilter iter = new TestMetadataBulkLoadFilter();
+    iter.init(new SortedMapIterator(tm1), new HashMap<String,String>(), new IteratorEnvironment() {
+      
+      @Override
+      public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
+        return null;
+      }
+      
+      @Override
+      public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {}
+      
+      @Override
+      public boolean isFullMajorCompaction() {
+        return false;
+      }
+      
+      @Override
+      public IteratorScope getIteratorScope() {
+        return IteratorScope.majc;
+      }
+      
+      @Override
+      public AccumuloConfiguration getConfig() {
+        return null;
+      }
+    });
+    
+    iter.seek(new Range(), new ArrayList<ByteSequence>(), false);
+    
+    TreeMap<Key,Value> actual = new TreeMap<Key,Value>();
+    
+    while (iter.hasTop()) {
+      actual.put(iter.getTopKey(), iter.getTopValue());
+      iter.next();
+    }
+    
+    Assert.assertEquals(expected, actual);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java
new file mode 100644
index 0000000..2dff03d
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.server.master.balancer.ChaoticLoadBalancer;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class ChaoticLoadBalancerTest {
+  
+  class FakeTServer {
+    List<KeyExtent> extents = new ArrayList<KeyExtent>();
+    
+    TabletServerStatus getStatus(TServerInstance server) {
+      TabletServerStatus result = new TabletServerStatus();
+      result.tableMap = new HashMap<String,TableInfo>();
+      for (KeyExtent extent : extents) {
+        String table = extent.getTableId().toString();
+        TableInfo info = result.tableMap.get(table);
+        if (info == null)
+          result.tableMap.put(table, info = new TableInfo());
+        info.onlineTablets++;
+        info.recs = info.onlineTablets;
+        info.ingestRate = 123.;
+        info.queryRate = 456.;
+      }
+      return result;
+    }
+  }
+  
+  Map<TServerInstance,FakeTServer> servers = new HashMap<TServerInstance,FakeTServer>();
+  
+  class TestChaoticLoadBalancer extends ChaoticLoadBalancer {
+    
+    @Override
+    public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String table) throws ThriftSecurityException, TException {
+      List<TabletStats> result = new ArrayList<TabletStats>();
+      for (KeyExtent extent : servers.get(tserver).extents) {
+        if (extent.getTableId().toString().equals(table)) {
+          result.add(new TabletStats(extent.toThrift(), null, null, null, 0l, 0., 0., 0));
+        }
+      }
+      return result;
+    }
+  }
+  
+  @Test
+  public void testAssignMigrations() {
+    servers.clear();
+    servers.put(new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), "a"), new FakeTServer());
+    servers.put(new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1235), "b"), new FakeTServer());
+    servers.put(new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1236), "c"), new FakeTServer());
+    Map<KeyExtent,TServerInstance> metadataTable = new TreeMap<KeyExtent,TServerInstance>();
+    String table = "t1";
+    metadataTable.put(makeExtent(table, null, null), null);
+    table = "t2";
+    metadataTable.put(makeExtent(table, "a", null), null);
+    metadataTable.put(makeExtent(table, null, "a"), null);
+    table = "t3";
+    metadataTable.put(makeExtent(table, "a", null), null);
+    metadataTable.put(makeExtent(table, "b", "a"), null);
+    metadataTable.put(makeExtent(table, "c", "b"), null);
+    metadataTable.put(makeExtent(table, "d", "c"), null);
+    metadataTable.put(makeExtent(table, "e", "d"), null);
+    metadataTable.put(makeExtent(table, null, "e"), null);
+    
+    TestChaoticLoadBalancer balancer = new TestChaoticLoadBalancer();
+    
+    SortedMap<TServerInstance,TabletServerStatus> current = new TreeMap<TServerInstance,TabletServerStatus>();
+    for (Entry<TServerInstance,FakeTServer> entry : servers.entrySet()) {
+      current.put(entry.getKey(), entry.getValue().getStatus(entry.getKey()));
+    }
+    
+    Map<KeyExtent,TServerInstance> assignments = new HashMap<KeyExtent,TServerInstance>();
+    balancer.getAssignments(getAssignments(servers), metadataTable, assignments);
+    
+    assertEquals(assignments.size(), metadataTable.size());
+  }
+  
+  SortedMap<TServerInstance,TabletServerStatus> getAssignments(Map<TServerInstance,FakeTServer> servers) {
+    SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<TServerInstance,TabletServerStatus>();
+    for (Entry<TServerInstance,FakeTServer> entry : servers.entrySet()) {
+      result.put(entry.getKey(), entry.getValue().getStatus(entry.getKey()));
+    }
+    return result;
+  }
+  
+  @Test
+  public void testUnevenAssignment() {
+    servers.clear();
+    for (char c : "abcdefghijklmnopqrstuvwxyz".toCharArray()) {
+      String cString = Character.toString(c);
+      HostAndPort fakeAddress = HostAndPort.fromParts("127.0.0.1", c);
+      String fakeInstance = cString;
+      TServerInstance tsi = new TServerInstance(fakeAddress, fakeInstance);
+      FakeTServer fakeTServer = new FakeTServer();
+      servers.put(tsi, fakeTServer);
+      fakeTServer.extents.add(makeExtent(cString, null, null));
+    }
+    // Put more tablets on one server, but not more than the number of servers
+    Entry<TServerInstance,FakeTServer> first = servers.entrySet().iterator().next();
+    first.getValue().extents.add(makeExtent("newTable", "a", null));
+    first.getValue().extents.add(makeExtent("newTable", "b", "a"));
+    first.getValue().extents.add(makeExtent("newTable", "c", "b"));
+    first.getValue().extents.add(makeExtent("newTable", "d", "c"));
+    first.getValue().extents.add(makeExtent("newTable", "e", "d"));
+    first.getValue().extents.add(makeExtent("newTable", "f", "e"));
+    first.getValue().extents.add(makeExtent("newTable", "g", "f"));
+    first.getValue().extents.add(makeExtent("newTable", "h", "g"));
+    first.getValue().extents.add(makeExtent("newTable", "i", null));
+    TestChaoticLoadBalancer balancer = new TestChaoticLoadBalancer();
+    Set<KeyExtent> migrations = Collections.emptySet();
+    
+    // Just want to make sure it gets some migrations, randomness prevents guarantee of a defined amount, or even expected amount
+    List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
+    while (migrationsOut.size() != 0) {
+      balancer.balance(getAssignments(servers), migrations, migrationsOut);
+    }
+  }
+  
+  private static KeyExtent makeExtent(String table, String end, String prev) {
+    return new KeyExtent(new Text(table), toText(end), toText(prev));
+  }
+  
+  private static Text toText(String value) {
+    if (value != null)
+      return new Text(value);
+    return null;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/test/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancerTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancerTest.java
new file mode 100644
index 0000000..a471a84
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancerTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class DefaultLoadBalancerTest {
+  
+  class FakeTServer {
+    List<KeyExtent> extents = new ArrayList<KeyExtent>();
+    
+    TabletServerStatus getStatus(TServerInstance server) {
+      TabletServerStatus result = new TabletServerStatus();
+      result.tableMap = new HashMap<String,TableInfo>();
+      for (KeyExtent extent : extents) {
+        String table = extent.getTableId().toString();
+        TableInfo info = result.tableMap.get(table);
+        if (info == null)
+          result.tableMap.put(table, info = new TableInfo());
+        info.onlineTablets++;
+        info.recs = info.onlineTablets;
+        info.ingestRate = 123.;
+        info.queryRate = 456.;
+      }
+      return result;
+    }
+  }
+  
+  Map<TServerInstance,FakeTServer> servers = new HashMap<TServerInstance,FakeTServer>();
+  
+  class TestDefaultLoadBalancer extends DefaultLoadBalancer {
+    
+    @Override
+    public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String table) throws ThriftSecurityException, TException {
+      List<TabletStats> result = new ArrayList<TabletStats>();
+      for (KeyExtent extent : servers.get(tserver).extents) {
+        if (extent.getTableId().toString().equals(table)) {
+          result.add(new TabletStats(extent.toThrift(), null, null, null, 0l, 0., 0., 0));
+        }
+      }
+      return result;
+    }
+  }
+  
+  @Test
+  public void testAssignMigrations() {
+    servers.clear();
+    servers.put(new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), "a"), new FakeTServer());
+    servers.put(new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1235), "b"), new FakeTServer());
+    servers.put(new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1236), "c"), new FakeTServer());
+    List<KeyExtent> metadataTable = new ArrayList<KeyExtent>();
+    String table = "t1";
+    metadataTable.add(makeExtent(table, null, null));
+    table = "t2";
+    metadataTable.add(makeExtent(table, "a", null));
+    metadataTable.add(makeExtent(table, null, "a"));
+    table = "t3";
+    metadataTable.add(makeExtent(table, "a", null));
+    metadataTable.add(makeExtent(table, "b", "a"));
+    metadataTable.add(makeExtent(table, "c", "b"));
+    metadataTable.add(makeExtent(table, "d", "c"));
+    metadataTable.add(makeExtent(table, "e", "d"));
+    metadataTable.add(makeExtent(table, null, "e"));
+    Collections.sort(metadataTable);
+    
+    TestDefaultLoadBalancer balancer = new TestDefaultLoadBalancer();
+    
+    SortedMap<TServerInstance,TabletServerStatus> current = new TreeMap<TServerInstance,TabletServerStatus>();
+    for (Entry<TServerInstance,FakeTServer> entry : servers.entrySet()) {
+      current.put(entry.getKey(), entry.getValue().getStatus(entry.getKey()));
+    }
+    assignTablets(metadataTable, servers, current, balancer);
+    
+    // Verify that the counts on the tables are correct
+    Map<String,Integer> expectedCounts = new HashMap<String,Integer>();
+    expectedCounts.put("t1", 1);
+    expectedCounts.put("t2", 1);
+    expectedCounts.put("t3", 2);
+    checkBalance(metadataTable, servers, expectedCounts);
+    
+    // Rebalance once
+    for (Entry<TServerInstance,FakeTServer> entry : servers.entrySet()) {
+      current.put(entry.getKey(), entry.getValue().getStatus(entry.getKey()));
+    }
+    
+    // Nothing should happen, we are balanced
+    ArrayList<TabletMigration> out = new ArrayList<TabletMigration>();
+    balancer.getMigrations(current, out);
+    assertEquals(out.size(), 0);
+    
+    // Take down a tabletServer
+    TServerInstance first = current.keySet().iterator().next();
+    current.remove(first);
+    FakeTServer remove = servers.remove(first);
+    
+    // reassign offline extents
+    assignTablets(remove.extents, servers, current, balancer);
+    checkBalance(metadataTable, servers, null);
+  }
+  
+  private void assignTablets(List<KeyExtent> metadataTable, Map<TServerInstance,FakeTServer> servers, SortedMap<TServerInstance,TabletServerStatus> status,
+      TestDefaultLoadBalancer balancer) {
+    // Assign tablets
+    for (KeyExtent extent : metadataTable) {
+      TServerInstance assignment = balancer.getAssignment(status, extent, null);
+      assertNotNull(assignment);
+      assertFalse(servers.get(assignment).extents.contains(extent));
+      servers.get(assignment).extents.add(extent);
+    }
+  }
+  
+  SortedMap<TServerInstance,TabletServerStatus> getAssignments(Map<TServerInstance,FakeTServer> servers) {
+    SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<TServerInstance,TabletServerStatus>();
+    for (Entry<TServerInstance,FakeTServer> entry : servers.entrySet()) {
+      result.put(entry.getKey(), entry.getValue().getStatus(entry.getKey()));
+    }
+    return result;
+  }
+  
+  @Test
+  public void testUnevenAssignment() {
+    servers.clear();
+    for (char c : "abcdefghijklmnopqrstuvwxyz".toCharArray()) {
+      String cString = Character.toString(c);
+      HostAndPort fakeAddress = HostAndPort.fromParts("127.0.0.1", (int) c);
+      String fakeInstance = cString;
+      TServerInstance tsi = new TServerInstance(fakeAddress, fakeInstance);
+      FakeTServer fakeTServer = new FakeTServer();
+      servers.put(tsi, fakeTServer);
+      fakeTServer.extents.add(makeExtent(cString, null, null));
+    }
+    // Put more tablets on one server, but not more than the number of servers
+    Entry<TServerInstance,FakeTServer> first = servers.entrySet().iterator().next();
+    first.getValue().extents.add(makeExtent("newTable", "a", null));
+    first.getValue().extents.add(makeExtent("newTable", "b", "a"));
+    first.getValue().extents.add(makeExtent("newTable", "c", "b"));
+    first.getValue().extents.add(makeExtent("newTable", "d", "c"));
+    first.getValue().extents.add(makeExtent("newTable", "e", "d"));
+    first.getValue().extents.add(makeExtent("newTable", "f", "e"));
+    first.getValue().extents.add(makeExtent("newTable", "g", "f"));
+    first.getValue().extents.add(makeExtent("newTable", "h", "g"));
+    first.getValue().extents.add(makeExtent("newTable", "i", null));
+    TestDefaultLoadBalancer balancer = new TestDefaultLoadBalancer();
+    Set<KeyExtent> migrations = Collections.emptySet();
+    int moved = 0;
+    // balance until we can't balance no more!
+    while (true) {
+      List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
+      balancer.balance(getAssignments(servers), migrations, migrationsOut);
+      if (migrationsOut.size() == 0)
+        break;
+      for (TabletMigration migration : migrationsOut) {
+        if (servers.get(migration.oldServer).extents.remove(migration.tablet))
+          moved++;
+        servers.get(migration.newServer).extents.add(migration.tablet);
+      }
+    }
+    assertEquals(8, moved);
+  }
+  
+  @Test
+  public void testUnevenAssignment2() {
+    servers.clear();
+    // make 26 servers
+    for (char c : "abcdefghijklmnopqrstuvwxyz".toCharArray()) {
+      String cString = Character.toString(c);
+      HostAndPort fakeAddress = HostAndPort.fromParts("127.0.0.1", (int) c);
+      String fakeInstance = cString;
+      TServerInstance tsi = new TServerInstance(fakeAddress, fakeInstance);
+      FakeTServer fakeTServer = new FakeTServer();
+      servers.put(tsi, fakeTServer);
+    }
+    // put 60 tablets on 25 of them
+    List<Entry<TServerInstance,FakeTServer>> shortList = new ArrayList<Entry<TServerInstance,FakeTServer>>(servers.entrySet());
+    Entry<TServerInstance,FakeTServer> shortServer = shortList.remove(0);
+    int c = 0;
+    for (int i = 0; i < 60; i++) {
+      for (Entry<TServerInstance,FakeTServer> entry : shortList) {
+        entry.getValue().extents.add(makeExtent("t" + c, null, null));
+      }
+    }
+    // put 10 on the that short server:
+    for (int i = 0; i < 10; i++) {
+      shortServer.getValue().extents.add(makeExtent("s" + i, null, null));
+    }
+    
+    TestDefaultLoadBalancer balancer = new TestDefaultLoadBalancer();
+    Set<KeyExtent> migrations = Collections.emptySet();
+    int moved = 0;
+    // balance until we can't balance no more!
+    while (true) {
+      List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
+      balancer.balance(getAssignments(servers), migrations, migrationsOut);
+      if (migrationsOut.size() == 0)
+        break;
+      for (TabletMigration migration : migrationsOut) {
+        if (servers.get(migration.oldServer).extents.remove(migration.tablet))
+          moved++;
+        servers.get(migration.newServer).extents.add(migration.tablet);
+      }
+    }
+    // average is 58, with 2 at 59: we need 48 more moved to the short server
+    assertEquals(48, moved);
+  }
+  
+  private void checkBalance(List<KeyExtent> metadataTable, Map<TServerInstance,FakeTServer> servers, Map<String,Integer> expectedCounts) {
+    // Verify they are spread evenly over the cluster
+    int average = metadataTable.size() / servers.size();
+    for (FakeTServer server : servers.values()) {
+      int diff = server.extents.size() - average;
+      if (diff < 0)
+        fail("average number of tablets is " + average + " but a server has " + server.extents.size());
+      if (diff > 1)
+        fail("average number of tablets is " + average + " but a server has " + server.extents.size());
+    }
+    
+    if (expectedCounts != null) {
+      for (FakeTServer server : servers.values()) {
+        Map<String,Integer> counts = new HashMap<String,Integer>();
+        for (KeyExtent extent : server.extents) {
+          String t = extent.getTableId().toString();
+          if (counts.get(t) == null)
+            counts.put(t, 0);
+          counts.put(t, counts.get(t) + 1);
+        }
+        for (Entry<String,Integer> entry : counts.entrySet()) {
+          assertEquals(expectedCounts.get(entry.getKey()), counts.get(entry.getKey()));
+        }
+      }
+    }
+  }
+  
+  private static KeyExtent makeExtent(String table, String end, String prev) {
+    return new KeyExtent(new Text(table), toText(end), toText(prev));
+  }
+  
+  private static Text toText(String value) {
+    if (value != null)
+      return new Text(value);
+    return null;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/test/java/org/apache/accumulo/server/master/balancer/TableLoadBalancerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/TableLoadBalancerTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/TableLoadBalancerTest.java
new file mode 100644
index 0000000..82e5885
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/TableLoadBalancerTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class TableLoadBalancerTest {
+  
+  static private TServerInstance mkts(String address, String session) throws Exception {
+    return new TServerInstance(HostAndPort.fromParts(address, 1234), session);
+  }
+  
+  static private TabletServerStatus status(Object... config) {
+    TabletServerStatus result = new TabletServerStatus();
+    result.tableMap = new HashMap<String,TableInfo>();
+    String tablename = null;
+    for (Object c : config) {
+      if (c instanceof String) {
+        tablename = (String) c;
+      } else {
+        TableInfo info = new TableInfo();
+        int count = (Integer) c;
+        info.onlineTablets = count;
+        info.tablets = count;
+        result.tableMap.put(tablename, info);
+      }
+    }
+    return result;
+  }
+  
+  static MockInstance instance = new MockInstance("mockamatic");
+  
+  static SortedMap<TServerInstance,TabletServerStatus> state;
+  
+  static List<TabletStats> generateFakeTablets(TServerInstance tserver, String tableId) {
+    List<TabletStats> result = new ArrayList<TabletStats>();
+    TabletServerStatus tableInfo = state.get(tserver);
+    // generate some fake tablets
+    for (int i = 0; i < tableInfo.tableMap.get(tableId).onlineTablets; i++) {
+      TabletStats stats = new TabletStats();
+      stats.extent = new KeyExtent(new Text(tableId), new Text(tserver.host() + String.format("%03d", i + 1)), new Text(tserver.host()
+          + String.format("%03d", i))).toThrift();
+      result.add(stats);
+    }
+    return result;
+  }
+  
+  static class DefaultLoadBalancer extends org.apache.accumulo.server.master.balancer.DefaultLoadBalancer {
+    
+    public DefaultLoadBalancer(String table) {
+      super(table);
+    }
+    
+    @Override
+    public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException {
+      return generateFakeTablets(tserver, tableId);
+    }
+  }
+  
+  // ugh... so wish I had provided mock objects to the LoadBalancer in the master
+  static class TableLoadBalancer extends org.apache.accumulo.server.master.balancer.TableLoadBalancer {
+    
+    TableLoadBalancer() {
+      super();
+    }
+    
+    // need to use our mock instance
+    @Override
+    protected TableOperations getTableOperations() {
+      try {
+        return instance.getConnector("user", new PasswordToken("pass")).tableOperations();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    // use our new classname to test class loading
+    @Override
+    protected String getLoadBalancerClassNameForTable(String table) {
+      return DefaultLoadBalancer.class.getName();
+    }
+    
+    // we don't have real tablet servers to ask: invent some online tablets
+    @Override
+    public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException {
+      return generateFakeTablets(tserver, tableId);
+    }
+  }
+  
+  @Test
+  public void test() throws Exception {
+    Connector c = instance.getConnector("user", new PasswordToken("pass"));
+    c.tableOperations().create("t1");
+    c.tableOperations().create("t2");
+    c.tableOperations().create("t3");
+    state = new TreeMap<TServerInstance,TabletServerStatus>();
+    TServerInstance svr = mkts("10.0.0.1", "0x01020304");
+    state.put(svr, status("t1", 10, "t2", 10, "t3", 10));
+    
+    Set<KeyExtent> migrations = Collections.emptySet();
+    List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
+    TableLoadBalancer tls = new TableLoadBalancer();
+    tls.balance(state, migrations, migrationsOut);
+    Assert.assertEquals(0, migrationsOut.size());
+    
+    state.put(mkts("10.0.0.2", "0x02030405"), status());
+    tls = new TableLoadBalancer();
+    tls.balance(state, migrations, migrationsOut);
+    int count = 0;
+    Map<String,Integer> movedByTable = new HashMap<String,Integer>();
+    movedByTable.put("t1", new Integer(0));
+    movedByTable.put("t2", new Integer(0));
+    movedByTable.put("t3", new Integer(0));
+    for (TabletMigration migration : migrationsOut) {
+      if (migration.oldServer.equals(svr))
+        count++;
+      String key = migration.tablet.getTableId().toString();
+      movedByTable.put(key, movedByTable.get(key) + 1);
+    }
+    Assert.assertEquals(15, count);
+    for (Integer moved : movedByTable.values()) {
+      Assert.assertEquals(5, moved.intValue());
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
new file mode 100644
index 0000000..f29fb27
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ConnectorImpl;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class SystemCredentialsTest {
+  
+  @BeforeClass
+  public static void setUp() throws IOException {
+    File testInstanceId = new File(new File(new File(new File("target"), "instanceTest"), "instance_id"), UUID.fromString(
+        "00000000-0000-0000-0000-000000000000").toString());
+    if (!testInstanceId.exists()) {
+      testInstanceId.getParentFile().mkdirs();
+      testInstanceId.createNewFile();
+    }
+  }
+  
+  /**
+   * This is a test to ensure the string literal in {@link ConnectorImpl#ConnectorImpl(Instance, Credentials)} is kept up-to-date if we move the
+   * {@link SystemToken}<br/>
+   * This check will not be needed after ACCUMULO-1578
+   */
+  @Test
+  public void testSystemToken() {
+    assertEquals("org.apache.accumulo.server.security.SystemCredentials$SystemToken", SystemToken.class.getName());
+    assertEquals(SystemCredentials.get().getToken().getClass(), SystemToken.class);
+  }
+  
+  @Test
+  public void testSystemCredentials() {
+    Credentials a = SystemCredentials.get();
+    Credentials b = SystemCredentials.get();
+    assertTrue(a == b);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java
new file mode 100644
index 0000000..9700c8a
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.ByteArraySet;
+import org.apache.accumulo.server.security.handler.ZKSecurityTool;
+
+import junit.framework.TestCase;
+
+public class ZKAuthenticatorTest extends TestCase {
+  public void testPermissionIdConversions() {
+    for (SystemPermission s : SystemPermission.values())
+      assertTrue(s.equals(SystemPermission.getPermissionById(s.getId())));
+    
+    for (TablePermission s : TablePermission.values())
+      assertTrue(s.equals(TablePermission.getPermissionById(s.getId())));
+  }
+  
+  public void testAuthorizationConversion() {
+    ByteArraySet auths = new ByteArraySet();
+    for (int i = 0; i < 300; i += 3)
+      auths.add(Integer.toString(i).getBytes());
+    
+    Authorizations converted = new Authorizations(auths);
+    byte[] test = ZKSecurityTool.convertAuthorizations(converted);
+    Authorizations test2 = ZKSecurityTool.convertAuthorizations(test);
+    assertTrue(auths.size() == test2.size());
+    for (byte[] s : auths) {
+      assertTrue(test2.contains(s));
+    }
+  }
+  
+  public void testSystemConversion() {
+    Set<SystemPermission> perms = new TreeSet<SystemPermission>();
+    for (SystemPermission s : SystemPermission.values())
+      perms.add(s);
+    
+    Set<SystemPermission> converted = ZKSecurityTool.convertSystemPermissions(ZKSecurityTool.convertSystemPermissions(perms));
+    assertTrue(perms.size() == converted.size());
+    for (SystemPermission s : perms)
+      assertTrue(converted.contains(s));
+  }
+  
+  public void testTableConversion() {
+    Set<TablePermission> perms = new TreeSet<TablePermission>();
+    for (TablePermission s : TablePermission.values())
+      perms.add(s);
+    
+    Set<TablePermission> converted = ZKSecurityTool.convertTablePermissions(ZKSecurityTool.convertTablePermissions(perms));
+    assertTrue(perms.size() == converted.size());
+    for (TablePermission s : perms)
+      assertTrue(converted.contains(s));
+  }
+  
+  public void testEncryption() {
+    byte[] rawPass = "myPassword".getBytes();
+    byte[] storedBytes;
+    try {
+      storedBytes = ZKSecurityTool.createPass(rawPass);
+      assertTrue(ZKSecurityTool.checkPass(rawPass, storedBytes));
+    } catch (AccumuloException e) {
+      e.printStackTrace();
+      assertTrue(false);
+    }
+  }
+}


Mime
View raw message