accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [50/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules
Date Fri, 01 Nov 2013 00:56:29 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
new file mode 100644
index 0000000..9e0ac39
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class ServerConstants {
+  
+  // versions should never be negative
+  public static final Integer WIRE_VERSION = 2;
+  
+  /**
+   * current version reflects the addition of a separate root table (ACCUMULO-1481)
+   */
+  public static final int DATA_VERSION = 6;
+  public static final int PREV_DATA_VERSION = 5;
+  
+  private static String[] baseDirs = null;
+  private static String defaultBaseDir = null;
+
+  public static synchronized String getDefaultBaseDir() {
+    if (defaultBaseDir == null) {
+      String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+      String dfsUri = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI);
+      String baseDir;
+      
+      if (dfsUri == null || dfsUri.isEmpty()) {
+        Configuration hadoopConfig = CachedConfiguration.getInstance();
+        try {
+          baseDir = FileSystem.get(hadoopConfig).getUri().toString() + singleNamespace;
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        if (!dfsUri.contains(":"))
+          throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_DFS_URI.getKey() + " got " + dfsUri);
+        baseDir = dfsUri + singleNamespace;
+      }
+      
+      defaultBaseDir = new Path(baseDir).toString();
+      
+    }
+    
+    return defaultBaseDir;
+  }
+
+  // these are functions to delay loading the Accumulo configuration unless we must
+  public static synchronized String[] getBaseDirs() {
+    if (baseDirs == null) {
+      String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+      String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES);
+      
+      if (ns == null || ns.isEmpty()) {
+        baseDirs = new String[] {getDefaultBaseDir()};
+      } else {
+        String namespaces[] = ns.split(",");
+        for (String namespace : namespaces) {
+          if (!namespace.contains(":")) {
+            throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace);
+          }
+        }
+        baseDirs = prefix(namespaces, singleNamespace);
+      }
+    }
+    
+    return baseDirs;
+  }
+  
+  public static String[] prefix(String bases[], String suffix) {
+    if (suffix.startsWith("/"))
+      suffix = suffix.substring(1);
+    String result[] = new String[bases.length];
+    for (int i = 0; i < bases.length; i++) {
+      result[i] = bases[i] + "/" + suffix;
+    }
+    return result;
+  }
+  
+  public static final String TABLE_DIR = "tables";
+  public static final String RECOVERY_DIR = "recovery";
+  public static final String WAL_DIR = "wal";
+
+  public static String[] getTablesDirs() {
+    return prefix(getBaseDirs(), TABLE_DIR);
+  }
+
+  public static String[] getRecoveryDirs() {
+    return prefix(getBaseDirs(), RECOVERY_DIR);
+  }
+  
+  public static String[] getWalDirs() {
+    return prefix(getBaseDirs(), WAL_DIR);
+  }
+  
+  public static String[] getWalogArchives() {
+    return prefix(getBaseDirs(), "walogArchive");
+  }
+  
+  public static Path getInstanceIdLocation() {
+    return new Path(getBaseDirs()[0], "instance_id");
+  }
+  
+  public static Path getDataVersionLocation() {
+    return new Path(getBaseDirs()[0], "version");
+  }
+  
+  public static String[] getRootTableDirs() {
+    return prefix(getTablesDirs(), RootTable.ID);
+  }
+  
+  public static String[] getMetadataTableDirs() {
+    return prefix(getTablesDirs(), MetadataTable.ID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java b/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java
new file mode 100644
index 0000000..95fee8f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import org.apache.accumulo.core.cli.Help;
+
+import com.beust.jcommander.Parameter;
+
+public class ServerOpts extends Help {
+  @Parameter(names={"-a", "--address"}, description = "address to bind to")
+  String address = null;
+  
+  public String getAddress() {
+    if (address != null)
+      return address;
+    return "0.0.0.0";
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java
new file mode 100644
index 0000000..53f5ac2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.cli;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+
+public class ClientOnDefaultTable extends org.apache.accumulo.core.cli.ClientOnDefaultTable {
+  {
+    principal = "root";
+  }
+
+  @Override
+  synchronized public Instance getInstance() {
+    if (cachedInstance != null)
+      return cachedInstance;
+    
+    if (mock)
+      return cachedInstance = new MockInstance(instance);
+    if (instance == null) {
+      return cachedInstance = HdfsZooInstance.getInstance();
+    }
+    return cachedInstance = new ZooKeeperInstance(this.instance, this.zookeepers);
+  }
+  public ClientOnDefaultTable(String table) {
+    super(table);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java
new file mode 100644
index 0000000..e9e9bf1
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.cli;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+
+public class ClientOnRequiredTable extends org.apache.accumulo.core.cli.ClientOnRequiredTable {
+  {
+    principal = "root";
+  }
+  
+  @Override
+  synchronized public Instance getInstance() {
+    if (cachedInstance != null)
+      return cachedInstance;
+    
+    if (mock)
+      return cachedInstance = new MockInstance(instance);
+    if (instance == null) {
+      return cachedInstance = HdfsZooInstance.getInstance();
+    }
+    return cachedInstance = new ZooKeeperInstance(this.instance, this.zookeepers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java
new file mode 100644
index 0000000..6f3516a
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.cli;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+
+public class ClientOpts extends org.apache.accumulo.core.cli.ClientOpts {
+  
+  {
+    principal = "root";
+  }
+
+  @Override
+  public Instance getInstance() {
+    if (mock)
+      return new MockInstance(instance);
+    if (instance == null) {
+      return HdfsZooInstance.getInstance();
+    }
+    return new ZooKeeperInstance(this.instance, this.zookeepers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
new file mode 100644
index 0000000..606941d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -0,0 +1,776 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ServerClient;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.util.StopWatch;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.trace.instrument.TraceRunnable;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TServiceClient;
+
+public class BulkImporter {
+  
+  private static final Logger log = Logger.getLogger(BulkImporter.class);
+  
+  public static List<String> bulkLoad(AccumuloConfiguration conf, Instance instance, Credentials creds, long tid, String tableId, List<String> files,
+      String errorDir, boolean setTime) throws IOException, AccumuloException, AccumuloSecurityException, ThriftTableOperationException {
+    AssignmentStats stats = new BulkImporter(conf, instance, creds, tid, tableId, setTime).importFiles(files, new Path(errorDir));
+    List<String> result = new ArrayList<String>();
+    for (Path p : stats.completeFailures.keySet()) {
+      result.add(p.toString());
+    }
+    return result;
+  }
+  
+  private StopWatch<Timers> timer;
+  
+  private static enum Timers {
+    EXAMINE_MAP_FILES, QUERY_METADATA, IMPORT_MAP_FILES, SLEEP, TOTAL
+  }
+  
+  private Instance instance;
+  private Credentials credentials;
+  private String tableId;
+  private long tid;
+  private AccumuloConfiguration acuConf;
+  private boolean setTime;
+  
+  public BulkImporter(AccumuloConfiguration conf, Instance instance, Credentials credentials, long tid, String tableId, boolean setTime) {
+    this.instance = instance;
+    this.credentials = credentials;
+    this.tid = tid;
+    this.tableId = tableId;
+    this.acuConf = conf;
+    this.setTime = setTime;
+  }
+  
+  public AssignmentStats importFiles(List<String> files, Path failureDir) throws IOException, AccumuloException, AccumuloSecurityException,
+      ThriftTableOperationException {
+    
+    int numThreads = acuConf.getCount(Property.TSERV_BULK_PROCESS_THREADS);
+    int numAssignThreads = acuConf.getCount(Property.TSERV_BULK_ASSIGNMENT_THREADS);
+    
+    timer = new StopWatch<Timers>(Timers.class);
+    timer.start(Timers.TOTAL);
+    
+    Configuration conf = CachedConfiguration.getInstance();
+    VolumeManagerImpl.get(acuConf);
+    final VolumeManager fs = VolumeManagerImpl.get(acuConf);
+
+    Set<Path> paths = new HashSet<Path>();
+    for (String file : files) {
+      paths.add(new Path(file));
+    }
+    AssignmentStats assignmentStats = new AssignmentStats(paths.size());
+    
+    final Map<Path,List<KeyExtent>> completeFailures = Collections.synchronizedSortedMap(new TreeMap<Path,List<KeyExtent>>());
+    
+    ClientService.Client client = null;
+    final TabletLocator locator = TabletLocator.getLocator(instance, new Text(tableId));
+    
+    try {
+      final Map<Path,List<TabletLocation>> assignments = Collections.synchronizedSortedMap(new TreeMap<Path,List<TabletLocation>>());
+      
+      timer.start(Timers.EXAMINE_MAP_FILES);
+      ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
+      
+      for (Path path : paths) {
+        final Path mapFile = path;
+        Runnable getAssignments = new Runnable() {
+          @Override
+          public void run() {
+            List<TabletLocation> tabletsToAssignMapFileTo = Collections.emptyList();
+            try {
+              tabletsToAssignMapFileTo = findOverlappingTablets(instance.getConfiguration(), fs, locator, mapFile, credentials);
+            } catch (Exception ex) {
+              log.warn("Unable to find tablets that overlap file " + mapFile.toString());
+            }
+            log.debug("Map file " + mapFile + " found to overlap " + tabletsToAssignMapFileTo.size() + " tablets");
+            if (tabletsToAssignMapFileTo.size() == 0) {
+              List<KeyExtent> empty = Collections.emptyList();
+              completeFailures.put(mapFile, empty);
+            } else
+              assignments.put(mapFile, tabletsToAssignMapFileTo);
+          }
+        };
+        threadPool.submit(new TraceRunnable(new LoggingRunnable(log, getAssignments)));
+      }
+      threadPool.shutdown();
+      while (!threadPool.isTerminated()) {
+        try {
+          threadPool.awaitTermination(60, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      timer.stop(Timers.EXAMINE_MAP_FILES);
+      
+      assignmentStats.attemptingAssignments(assignments);
+      Map<Path,List<KeyExtent>> assignmentFailures = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
+          numThreads);
+      assignmentStats.assignmentsFailed(assignmentFailures);
+      
+      Map<Path,Integer> failureCount = new TreeMap<Path,Integer>();
+      
+      for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet())
+        failureCount.put(entry.getKey(), 1);
+      
+      long sleepTime = 2 * 1000;
+      while (assignmentFailures.size() > 0) {
+        sleepTime = Math.min(sleepTime * 2, 60 * 1000);
+        locator.invalidateCache();
+        // assumption about assignment failures is that it caused by a split
+        // happening or a missing location
+        //
+        // for splits we need to find children key extents that cover the
+        // same key range and are contiguous (no holes, no overlap)
+        
+        timer.start(Timers.SLEEP);
+        UtilWaitThread.sleep(sleepTime);
+        timer.stop(Timers.SLEEP);
+        
+        log.debug("Trying to assign " + assignmentFailures.size() + " map files that previously failed on some key extents");
+        assignments.clear();
+        
+        // for failed key extents, try to find children key extents to
+        // assign to
+        for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
+          Iterator<KeyExtent> keListIter = entry.getValue().iterator();
+          
+          List<TabletLocation> tabletsToAssignMapFileTo = new ArrayList<TabletLocation>();
+          
+          while (keListIter.hasNext()) {
+            KeyExtent ke = keListIter.next();
+            
+            try {
+              timer.start(Timers.QUERY_METADATA);
+              tabletsToAssignMapFileTo.addAll(findOverlappingTablets(instance.getConfiguration(), fs, locator, entry.getKey(), ke, credentials));
+              timer.stop(Timers.QUERY_METADATA);
+              keListIter.remove();
+            } catch (Exception ex) {
+              log.warn("Exception finding overlapping tablets, will retry tablet " + ke);
+            }
+          }
+          
+          if (tabletsToAssignMapFileTo.size() > 0)
+            assignments.put(entry.getKey(), tabletsToAssignMapFileTo);
+        }
+        
+        assignmentStats.attemptingAssignments(assignments);
+        Map<Path,List<KeyExtent>> assignmentFailures2 = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
+            numThreads);
+        assignmentStats.assignmentsFailed(assignmentFailures2);
+        
+        // merge assignmentFailures2 into assignmentFailures
+        for (Entry<Path,List<KeyExtent>> entry : assignmentFailures2.entrySet()) {
+          assignmentFailures.get(entry.getKey()).addAll(entry.getValue());
+          
+          Integer fc = failureCount.get(entry.getKey());
+          if (fc == null)
+            fc = 0;
+          
+          failureCount.put(entry.getKey(), fc + 1);
+        }
+        
+        // remove map files that have no more key extents to assign
+        Iterator<Entry<Path,List<KeyExtent>>> afIter = assignmentFailures.entrySet().iterator();
+        while (afIter.hasNext()) {
+          Entry<Path,List<KeyExtent>> entry = afIter.next();
+          if (entry.getValue().size() == 0)
+            afIter.remove();
+        }
+        
+        Set<Entry<Path,Integer>> failureIter = failureCount.entrySet();
+        for (Entry<Path,Integer> entry : failureIter) {
+          int retries = acuConf.getCount(Property.TSERV_BULK_RETRY);
+          if (entry.getValue() > retries && assignmentFailures.get(entry.getKey()) != null) {
+            log.error("Map file " + entry.getKey() + " failed more than " + retries + " times, giving up.");
+            completeFailures.put(entry.getKey(), assignmentFailures.get(entry.getKey()));
+            assignmentFailures.remove(entry.getKey());
+          }
+        }
+      }
+      assignmentStats.assignmentsAbandoned(completeFailures);
+      Set<Path> failedFailures = processFailures(completeFailures);
+      assignmentStats.unrecoveredMapFiles(failedFailures);
+      
+      timer.stop(Timers.TOTAL);
+      printReport();
+      return assignmentStats;
+    } finally {
+      if (client != null)
+        ServerClient.close(client);
+      locator.invalidateCache();
+    }
+  }
+  
+  private void printReport() {
+    long totalTime = 0;
+    for (Timers t : Timers.values()) {
+      if (t == Timers.TOTAL)
+        continue;
+      
+      totalTime += timer.get(t);
+    }
+    
+    log.debug("BULK IMPORT TIMING STATISTICS");
+    log.debug(String.format("Examine map files    : %,10.2f secs %6.2f%s", timer.getSecs(Timers.EXAMINE_MAP_FILES), 100.0 * timer.get(Timers.EXAMINE_MAP_FILES)
+        / timer.get(Timers.TOTAL), "%"));
+    log.debug(String.format("Query %-14s : %,10.2f secs %6.2f%s", MetadataTable.NAME, timer.getSecs(Timers.QUERY_METADATA),
+        100.0 * timer.get(Timers.QUERY_METADATA) / timer.get(Timers.TOTAL), "%"));
+    log.debug(String.format("Import Map Files     : %,10.2f secs %6.2f%s", timer.getSecs(Timers.IMPORT_MAP_FILES), 100.0 * timer.get(Timers.IMPORT_MAP_FILES)
+        / timer.get(Timers.TOTAL), "%"));
+    log.debug(String.format("Sleep                : %,10.2f secs %6.2f%s", timer.getSecs(Timers.SLEEP),
+        100.0 * timer.get(Timers.SLEEP) / timer.get(Timers.TOTAL), "%"));
+    log.debug(String.format("Misc                 : %,10.2f secs %6.2f%s", (timer.get(Timers.TOTAL) - totalTime) / 1000.0, 100.0
+        * (timer.get(Timers.TOTAL) - totalTime) / timer.get(Timers.TOTAL), "%"));
+    log.debug(String.format("Total                : %,10.2f secs", timer.getSecs(Timers.TOTAL)));
+  }
+  
+  private Set<Path> processFailures(Map<Path,List<KeyExtent>> completeFailures) {
+    // we should check if map file was not assigned to any tablets, then we
+    // should just move it; not currently being done?
+    
+    Set<Entry<Path,List<KeyExtent>>> es = completeFailures.entrySet();
+    
+    if (completeFailures.size() == 0)
+      return Collections.emptySet();
+    
+    log.debug("The following map files failed ");
+    
+    for (Entry<Path,List<KeyExtent>> entry : es) {
+      List<KeyExtent> extents = entry.getValue();
+      
+      for (KeyExtent keyExtent : extents)
+        log.debug("\t" + entry.getKey() + " -> " + keyExtent);
+    }
+    
+    return Collections.emptySet();
+  }
+  
+  private class AssignmentInfo {
+    public AssignmentInfo(KeyExtent keyExtent, Long estSize) {
+      this.ke = keyExtent;
+      this.estSize = estSize;
+    }
+    
+    KeyExtent ke;
+    long estSize;
+  }
+  
+  private static List<KeyExtent> extentsOf(List<TabletLocation> locations) {
+    List<KeyExtent> result = new ArrayList<KeyExtent>(locations.size());
+    for (TabletLocation tl : locations)
+      result.add(tl.tablet_extent);
+    return result;
+  }
+  
+  private Map<Path,List<AssignmentInfo>> estimateSizes(final AccumuloConfiguration acuConf, final Configuration conf, final VolumeManager vm,
+      Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads) {
+    
+    long t1 = System.currentTimeMillis();
+    final Map<Path,Long> mapFileSizes = new TreeMap<Path,Long>();
+    
+    try {
+      for (Path path : paths) {
+        FileSystem fs = vm.getFileSystemByPath(path);
+        mapFileSizes.put(path, fs.getContentSummary(path).getLength());
+      }
+    } catch (IOException e) {
+      log.error("Failed to get map files in for " + paths + ": " + e.getMessage(), e);
+      throw new RuntimeException(e);
+    }
+    
+    final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<Path,List<AssignmentInfo>>());
+    
+    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
+    
+    for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
+      if (entry.getValue().size() == 1) {
+        TabletLocation tabletLocation = entry.getValue().get(0);
+        
+        // if the tablet completely contains the map file, there is no
+        // need to estimate its
+        // size
+        ais.put(entry.getKey(), Collections.singletonList(new AssignmentInfo(tabletLocation.tablet_extent, mapFileSizes.get(entry.getKey()))));
+        continue;
+      }
+      
+      Runnable estimationTask = new Runnable() {
+        @Override
+        public void run() {
+          Map<KeyExtent,Long> estimatedSizes = null;
+          
+          try {
+            FileSystem fs = vm.getFileSystemByPath(entry.getKey());
+            estimatedSizes = FileUtil.estimateSizes(acuConf, entry.getKey(), mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), conf, fs);
+          } catch (IOException e) {
+            log.warn("Failed to estimate map file sizes " + e.getMessage());
+          }
+          
+          if (estimatedSizes == null) {
+            // estimation failed, do a simple estimation
+            estimatedSizes = new TreeMap<KeyExtent,Long>();
+            long estSize = (long) (mapFileSizes.get(entry.getKey()) / (double) entry.getValue().size());
+            for (TabletLocation tl : entry.getValue())
+              estimatedSizes.put(tl.tablet_extent, estSize);
+          }
+          
+          List<AssignmentInfo> assignmentInfoList = new ArrayList<AssignmentInfo>(estimatedSizes.size());
+          
+          for (Entry<KeyExtent,Long> entry2 : estimatedSizes.entrySet())
+            assignmentInfoList.add(new AssignmentInfo(entry2.getKey(), entry2.getValue()));
+          
+          ais.put(entry.getKey(), assignmentInfoList);
+        }
+      };
+      
+      threadPool.submit(new TraceRunnable(new LoggingRunnable(log, estimationTask)));
+    }
+    
+    threadPool.shutdown();
+    
+    while (!threadPool.isTerminated()) {
+      try {
+        threadPool.awaitTermination(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+    }
+    
+    long t2 = System.currentTimeMillis();
+    
+    log.debug(String.format("Estimated map files sizes in %6.2f secs", (t2 - t1) / 1000.0));
+    
+    return ais;
+  }
+  
+  private static Map<KeyExtent,String> locationsOf(Map<Path,List<TabletLocation>> assignments) {
+    Map<KeyExtent,String> result = new HashMap<KeyExtent,String>();
+    for (List<TabletLocation> entry : assignments.values()) {
+      for (TabletLocation tl : entry) {
+        result.put(tl.tablet_extent, tl.tablet_location);
+      }
+    }
+    return result;
+  }
+  
+  private Map<Path,List<KeyExtent>> assignMapFiles(AccumuloConfiguration acuConf, Instance instance, Configuration conf, Credentials credentials,
+      VolumeManager fs, String tableId, Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads, int numMapThreads) {
+    timer.start(Timers.EXAMINE_MAP_FILES);
+    Map<Path,List<AssignmentInfo>> assignInfo = estimateSizes(acuConf, conf, fs, assignments, paths, numMapThreads);
+    timer.stop(Timers.EXAMINE_MAP_FILES);
+    
+    Map<Path,List<KeyExtent>> ret;
+    
+    timer.start(Timers.IMPORT_MAP_FILES);
+    ret = assignMapFiles(credentials, tableId, assignInfo, locationsOf(assignments), numThreads);
+    timer.stop(Timers.IMPORT_MAP_FILES);
+    
+    return ret;
+  }
+  
+  private class AssignmentTask implements Runnable {
+    final Map<Path,List<KeyExtent>> assignmentFailures;
+    String location;
+    Credentials credentials;
+    private Map<KeyExtent,List<PathSize>> assignmentsPerTablet;
+    
+    public AssignmentTask(Credentials credentials, Map<Path,List<KeyExtent>> assignmentFailures, String tableName, String location,
+        Map<KeyExtent,List<PathSize>> assignmentsPerTablet) {
+      this.assignmentFailures = assignmentFailures;
+      this.location = location;
+      this.assignmentsPerTablet = assignmentsPerTablet;
+      this.credentials = credentials;
+    }
+    
+    private void handleFailures(Collection<KeyExtent> failures, String message) {
+      for (KeyExtent ke : failures) {
+        List<PathSize> mapFiles = assignmentsPerTablet.get(ke);
+        synchronized (assignmentFailures) {
+          for (PathSize pathSize : mapFiles) {
+            List<KeyExtent> existingFailures = assignmentFailures.get(pathSize.path);
+            if (existingFailures == null) {
+              existingFailures = new ArrayList<KeyExtent>();
+              assignmentFailures.put(pathSize.path, existingFailures);
+            }
+            
+            existingFailures.add(ke);
+          }
+        }
+        
+        log.info("Could not assign " + mapFiles.size() + " map files to tablet " + ke + " because : " + message + ".  Will retry ...");
+      }
+    }
+    
+    @Override
+    public void run() {
+      HashSet<Path> uniqMapFiles = new HashSet<Path>();
+      for (List<PathSize> mapFiles : assignmentsPerTablet.values())
+        for (PathSize ps : mapFiles)
+          uniqMapFiles.add(ps.path);
+      
+      log.debug("Assigning " + uniqMapFiles.size() + " map files to " + assignmentsPerTablet.size() + " tablets at " + location);
+      
+      try {
+        List<KeyExtent> failures = assignMapFiles(credentials, location, assignmentsPerTablet);
+        handleFailures(failures, "Not Serving Tablet");
+      } catch (AccumuloException e) {
+        handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
+      } catch (AccumuloSecurityException e) {
+        handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
+      }
+    }
+    
+  }
+  
+  private class PathSize {
+    public PathSize(Path mapFile, long estSize) {
+      this.path = mapFile;
+      this.estSize = estSize;
+    }
+    
+    Path path;
+    long estSize;
+    
+    @Override
+    public String toString() {
+      return path + " " + estSize;
+    }
+  }
+  
+  private Map<Path,List<KeyExtent>> assignMapFiles(Credentials credentials, String tableName, Map<Path,List<AssignmentInfo>> assignments,
+      Map<KeyExtent,String> locations, int numThreads) {
+    
+    // group assignments by tablet
+    Map<KeyExtent,List<PathSize>> assignmentsPerTablet = new TreeMap<KeyExtent,List<PathSize>>();
+    for (Entry<Path,List<AssignmentInfo>> entry : assignments.entrySet()) {
+      Path mapFile = entry.getKey();
+      List<AssignmentInfo> tabletsToAssignMapFileTo = entry.getValue();
+      
+      for (AssignmentInfo ai : tabletsToAssignMapFileTo) {
+        List<PathSize> mapFiles = assignmentsPerTablet.get(ai.ke);
+        if (mapFiles == null) {
+          mapFiles = new ArrayList<PathSize>();
+          assignmentsPerTablet.put(ai.ke, mapFiles);
+        }
+        
+        mapFiles.add(new PathSize(mapFile, ai.estSize));
+      }
+    }
+    
+    // group assignments by tabletserver
+    
+    Map<Path,List<KeyExtent>> assignmentFailures = Collections.synchronizedMap(new TreeMap<Path,List<KeyExtent>>());
+    
+    TreeMap<String,Map<KeyExtent,List<PathSize>>> assignmentsPerTabletServer = new TreeMap<String,Map<KeyExtent,List<PathSize>>>();
+    
+    for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
+      KeyExtent ke = entry.getKey();
+      String location = locations.get(ke);
+      
+      if (location == null) {
+        for (PathSize pathSize : entry.getValue()) {
+          synchronized (assignmentFailures) {
+            List<KeyExtent> failures = assignmentFailures.get(pathSize.path);
+            if (failures == null) {
+              failures = new ArrayList<KeyExtent>();
+              assignmentFailures.put(pathSize.path, failures);
+            }
+            
+            failures.add(ke);
+          }
+        }
+        
+        log.warn("Could not assign " + entry.getValue().size() + " map files to tablet " + ke + " because it had no location, will retry ...");
+        
+        continue;
+      }
+      
+      Map<KeyExtent,List<PathSize>> apt = assignmentsPerTabletServer.get(location);
+      if (apt == null) {
+        apt = new TreeMap<KeyExtent,List<PathSize>>();
+        assignmentsPerTabletServer.put(location, apt);
+      }
+      
+      apt.put(entry.getKey(), entry.getValue());
+    }
+    
+    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
+    
+    for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer.entrySet()) {
+      String location = entry.getKey();
+      threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()));
+    }
+    
+    threadPool.shutdown();
+    
+    while (!threadPool.isTerminated()) {
+      try {
+        threadPool.awaitTermination(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+    }
+    
+    return assignmentFailures;
+  }
+  
+  private List<KeyExtent> assignMapFiles(Credentials credentials, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet)
+      throws AccumuloException, AccumuloSecurityException {
+    try {
+      long timeInMillis = instance.getConfiguration().getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
+      TabletClientService.Iface client = ThriftUtil.getTServerClient(location, timeInMillis);
+      try {
+        HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>> files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
+        for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
+          HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo> tabletFiles = new HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo>();
+          files.put(entry.getKey(), tabletFiles);
+          
+          for (PathSize pathSize : entry.getValue()) {
+            org.apache.accumulo.core.data.thrift.MapFileInfo mfi = new org.apache.accumulo.core.data.thrift.MapFileInfo(pathSize.estSize);
+            tabletFiles.put(pathSize.path.toString(), mfi);
+          }
+        }
+        
+        log.debug("Asking " + location + " to bulk load " + files);
+        List<TKeyExtent> failures = client.bulkImport(Tracer.traceInfo(), credentials.toThrift(instance), tid, Translator.translate(files, Translator.KET),
+            setTime);
+        
+        return Translator.translate(failures, Translator.TKET);
+      } finally {
+        ThriftUtil.returnClient((TServiceClient) client);
+      }
+    } catch (ThriftSecurityException e) {
+      throw new AccumuloSecurityException(e.user, e.code, e);
+    } catch (Throwable t) {
+      t.printStackTrace();
+      throw new AccumuloException(t);
+    }
+  }
+  
+  public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file,
+      Credentials credentials) throws Exception {
+    return findOverlappingTablets(acuConf, fs, locator, file, null, null, credentials);
+  }
+  
+  public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file,
+      KeyExtent failed,
+      Credentials credentials) throws Exception {
+    locator.invalidateCache(failed);
+    Text start = failed.getPrevEndRow();
+    if (start != null)
+      start = Range.followingPrefix(start);
+    return findOverlappingTablets(acuConf, fs, locator, file, start, failed.getEndRow(), credentials);
+  }
+  
+  final static byte[] byte0 = {0};
+  
+  public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager vm, TabletLocator locator, Path file, Text startRow,
+      Text endRow, Credentials credentials) throws Exception {
+    List<TabletLocation> result = new ArrayList<TabletLocation>();
+    Collection<ByteSequence> columnFamilies = Collections.emptyList();
+    String filename = file.toString();
+    // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
+    FileSystem fs = vm.getFileSystemByPath(file);
+    FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), acuConf);
+    try {
+      Text row = startRow;
+      if (row == null)
+        row = new Text();
+      while (true) {
+        // log.debug(filename + " Seeking to row " + row);
+        reader.seek(new Range(row, null), columnFamilies, false);
+        if (!reader.hasTop()) {
+          // log.debug(filename + " not found");
+          break;
+        }
+        row = reader.getTopKey().getRow();
+        TabletLocation tabletLocation = locator.locateTablet(credentials, row, false, true);
+        // log.debug(filename + " found row " + row + " at location " + tabletLocation);
+        result.add(tabletLocation);
+        row = tabletLocation.tablet_extent.getEndRow();
+        if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
+          row = new Text(row);
+          row.append(byte0, 0, byte0.length);
+        } else
+          break;
+      }
+    } finally {
+      reader.close();
+    }
+    // log.debug(filename + " to be sent to " + result);
+    return result;
+  }
+  
+  public static class AssignmentStats {
+    private Map<KeyExtent,Integer> counts;
+    private int numUniqueMapFiles;
+    private Map<Path,List<KeyExtent>> completeFailures = null;
+    private Set<Path> failedFailures = null;
+    
+    AssignmentStats(int fileCount) {
+      counts = new HashMap<KeyExtent,Integer>();
+      numUniqueMapFiles = fileCount;
+    }
+    
+    void attemptingAssignments(Map<Path,List<TabletLocation>> assignments) {
+      for (Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
+        for (TabletLocation tl : entry.getValue()) {
+          
+          Integer count = getCount(tl.tablet_extent);
+          
+          counts.put(tl.tablet_extent, count + 1);
+        }
+      }
+    }
+    
+    void assignmentsFailed(Map<Path,List<KeyExtent>> assignmentFailures) {
+      for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
+        for (KeyExtent ke : entry.getValue()) {
+          
+          Integer count = getCount(ke);
+          
+          counts.put(ke, count - 1);
+        }
+      }
+    }
+    
+    void assignmentsAbandoned(Map<Path,List<KeyExtent>> completeFailures) {
+      this.completeFailures = completeFailures;
+    }
+    
+    void tabletSplit(KeyExtent parent, Collection<KeyExtent> children) {
+      Integer count = getCount(parent);
+      
+      counts.remove(parent);
+      
+      for (KeyExtent keyExtent : children)
+        counts.put(keyExtent, count);
+    }
+    
+    private Integer getCount(KeyExtent parent) {
+      Integer count = counts.get(parent);
+      
+      if (count == null) {
+        count = 0;
+      }
+      return count;
+    }
+    
+    void unrecoveredMapFiles(Set<Path> failedFailures) {
+      this.failedFailures = failedFailures;
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      int totalAssignments = 0;
+      int tabletsImportedTo = 0;
+      
+      int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE;
+      
+      for (Entry<KeyExtent,Integer> entry : counts.entrySet()) {
+        totalAssignments += entry.getValue();
+        if (entry.getValue() > 0)
+          tabletsImportedTo++;
+        
+        if (entry.getValue() < min)
+          min = entry.getValue();
+        
+        if (entry.getValue() > max)
+          max = entry.getValue();
+      }
+      
+      double stddev = 0;
+      
+      for (Entry<KeyExtent,Integer> entry : counts.entrySet())
+        stddev += Math.pow(entry.getValue() - totalAssignments / (double) counts.size(), 2);
+      
+      stddev = stddev / counts.size();
+      stddev = Math.sqrt(stddev);
+      
+      Set<KeyExtent> failedTablets = new HashSet<KeyExtent>();
+      for (List<KeyExtent> ft : completeFailures.values())
+        failedTablets.addAll(ft);
+      
+      sb.append("BULK IMPORT ASSIGNMENT STATISTICS\n");
+      sb.append(String.format("# of map files            : %,10d%n", numUniqueMapFiles));
+      sb.append(String.format("# map files with failures : %,10d %6.2f%s%n", completeFailures.size(), completeFailures.size() * 100.0 / numUniqueMapFiles, "%"));
+      sb.append(String.format("# failed failed map files : %,10d %s%n", failedFailures.size(), failedFailures.size() > 0 ? " <-- THIS IS BAD" : ""));
+      sb.append(String.format("# of tablets              : %,10d%n", counts.size()));
+      sb.append(String.format("# tablets imported to     : %,10d %6.2f%s%n", tabletsImportedTo, tabletsImportedTo * 100.0 / counts.size(), "%"));
+      sb.append(String.format("# tablets with failures   : %,10d %6.2f%s%n", failedTablets.size(), failedTablets.size() * 100.0 / counts.size(), "%"));
+      sb.append(String.format("min map files per tablet  : %,10d%n", min));
+      sb.append(String.format("max map files per tablet  : %,10d%n", max));
+      sb.append(String.format("avg map files per tablet  : %,10.2f (std dev = %.2f)%n", totalAssignments / (double) counts.size(), stddev));
+      return sb.toString();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
new file mode 100644
index 0000000..3f1aaa2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ConfigurationType;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.TDiskUsage;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.util.TableDiskUsage;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+public class ClientServiceHandler implements ClientService.Iface {
+  private static final Logger log = Logger.getLogger(ClientServiceHandler.class);
+  private static SecurityOperation security = AuditedSecurityOperation.getInstance();
+  protected final TransactionWatcher transactionWatcher;
+  private final Instance instance;
+  private final VolumeManager fs;
+  
+  public ClientServiceHandler(Instance instance, TransactionWatcher transactionWatcher, VolumeManager fs) {
+    this.instance = instance;
+    this.transactionWatcher = transactionWatcher;
+    this.fs = fs;
+  }
+  
+  protected String checkTableId(String tableName, TableOperation operation) throws ThriftTableOperationException {
+    String tableId = Tables.getNameToIdMap(instance).get(tableName);
+    if (tableId == null) {
+      // maybe the table exist, but the cache was not updated yet... so try to clear the cache and check again
+      Tables.clearCache(instance);
+      tableId = Tables.getNameToIdMap(instance).get(tableName);
+      if (tableId == null)
+        throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.NOTFOUND, null);
+    }
+    return tableId;
+  }
+  
+  @Override
+  public String getInstanceId() {
+    return instance.getInstanceID();
+  }
+  
+  @Override
+  public String getRootTabletLocation() {
+    return instance.getRootTabletLocation();
+  }
+  
+  @Override
+  public String getZooKeepers() {
+    return instance.getZooKeepers();
+  }
+  
+  @Override
+  public void ping(TCredentials credentials) {
+    // anybody can call this; no authentication check
+    log.info("Master reports: I just got pinged!");
+  }
+  
+  @Override
+  public boolean authenticate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
+    try {
+      return security.authenticateUser(credentials, credentials);
+    } catch (ThriftSecurityException e) {
+      log.error(e);
+      throw e;
+    }
+  }
+  
+  @Override
+  public boolean authenticateUser(TInfo tinfo, TCredentials credentials, TCredentials toAuth) throws ThriftSecurityException {
+    try {
+      return security.authenticateUser(credentials, toAuth);
+    } catch (ThriftSecurityException e) {
+      log.error(e);
+      throw e;
+    }
+  }
+  
+  @Override
+  public void changeAuthorizations(TInfo tinfo, TCredentials credentials, String user, List<ByteBuffer> authorizations) throws ThriftSecurityException {
+    security.changeAuthorizations(credentials, user, new Authorizations(authorizations));
+  }
+  
+  @Override
+  public void changeLocalUserPassword(TInfo tinfo, TCredentials credentials, String principal, ByteBuffer password) throws ThriftSecurityException {
+    PasswordToken token = new PasswordToken(password);
+    Credentials toChange = new Credentials(principal, token);
+    security.changePassword(credentials, toChange);
+  }
+  
+  @Override
+  public void createLocalUser(TInfo tinfo, TCredentials credentials, String principal, ByteBuffer password) throws ThriftSecurityException {
+    PasswordToken token = new PasswordToken(password);
+    Credentials newUser = new Credentials(principal, token);
+    security.createUser(credentials, newUser, new Authorizations());
+  }
+  
+  @Override
+  public void dropLocalUser(TInfo tinfo, TCredentials credentials, String user) throws ThriftSecurityException {
+    security.dropUser(credentials, user);
+  }
+  
+  @Override
+  public List<ByteBuffer> getUserAuthorizations(TInfo tinfo, TCredentials credentials, String user) throws ThriftSecurityException {
+    return security.getUserAuthorizations(credentials, user).getAuthorizationsBB();
+  }
+  
+  @Override
+  public void grantSystemPermission(TInfo tinfo, TCredentials credentials, String user, byte permission) throws ThriftSecurityException {
+    security.grantSystemPermission(credentials, user, SystemPermission.getPermissionById(permission));
+  }
+  
+  @Override
+  public void grantTablePermission(TInfo tinfo, TCredentials credentials, String user, String tableName, byte permission) throws ThriftSecurityException,
+      ThriftTableOperationException {
+    String tableId = checkTableId(tableName, TableOperation.PERMISSION);
+    security.grantTablePermission(credentials, user, tableId, TablePermission.getPermissionById(permission));
+  }
+  
+  @Override
+  public void revokeSystemPermission(TInfo tinfo, TCredentials credentials, String user, byte permission) throws ThriftSecurityException {
+    security.revokeSystemPermission(credentials, user, SystemPermission.getPermissionById(permission));
+  }
+  
+  @Override
+  public void revokeTablePermission(TInfo tinfo, TCredentials credentials, String user, String tableName, byte permission) throws ThriftSecurityException,
+      ThriftTableOperationException {
+    String tableId = checkTableId(tableName, TableOperation.PERMISSION);
+    security.revokeTablePermission(credentials, user, tableId, TablePermission.getPermissionById(permission));
+  }
+  
+  @Override
+  public boolean hasSystemPermission(TInfo tinfo, TCredentials credentials, String user, byte sysPerm) throws ThriftSecurityException {
+    return security.hasSystemPermission(credentials, user, SystemPermission.getPermissionById(sysPerm));
+  }
+  
+  @Override
+  public boolean hasTablePermission(TInfo tinfo, TCredentials credentials, String user, String tableName, byte tblPerm) throws ThriftSecurityException,
+      ThriftTableOperationException {
+    String tableId = checkTableId(tableName, TableOperation.PERMISSION);
+    return security.hasTablePermission(credentials, user, tableId, TablePermission.getPermissionById(tblPerm));
+  }
+  
+  @Override
+  public Set<String> listLocalUsers(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
+    return security.listUsers(credentials);
+  }
+  
+  private static Map<String,String> conf(TCredentials credentials, AccumuloConfiguration conf) throws TException {
+    security.authenticateUser(credentials, credentials);
+    conf.invalidateCache();
+    
+    Map<String,String> result = new HashMap<String,String>();
+    for (Entry<String,String> entry : conf) {
+      String key = entry.getKey();
+      if (!Property.isSensitive(key))
+        result.put(key, entry.getValue());
+    }
+    return result;
+  }
+  
+  @Override
+  public Map<String,String> getConfiguration(TInfo tinfo, TCredentials credentials, ConfigurationType type) throws TException {
+    switch (type) {
+      case CURRENT:
+        return conf(credentials, new ServerConfiguration(instance).getConfiguration());
+      case SITE:
+        return conf(credentials, ServerConfiguration.getSiteConfiguration());
+      case DEFAULT:
+        return conf(credentials, AccumuloConfiguration.getDefaultConfiguration());
+    }
+    throw new RuntimeException("Unexpected configuration type " + type);
+  }
+  
+  @Override
+  public Map<String,String> getTableConfiguration(TInfo tinfo, TCredentials credentials, String tableName) throws TException, ThriftTableOperationException {
+    String tableId = checkTableId(tableName, null);
+    return conf(credentials, new ServerConfiguration(instance).getTableConfiguration(tableId));
+  }
+  
+  @Override
+  public List<String> bulkImportFiles(TInfo tinfo, final TCredentials credentials, final long tid, final String tableId, final List<String> files,
+      final String errorDir, final boolean setTime) throws ThriftSecurityException, ThriftTableOperationException, TException {
+    try {
+      if (!security.canPerformSystemActions(credentials))
+        throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, new Callable<List<String>>() {
+        @Override
+        public List<String> call() throws Exception {
+          return BulkImporter.bulkLoad(new ServerConfiguration(instance).getConfiguration(), instance, new Credentials(credentials.getPrincipal(),
+              AuthenticationTokenSerializer.deserialize(credentials.getTokenClassName(), credentials.getToken())), tid, tableId, files, errorDir, setTime);
+        }
+      });
+    } catch (AccumuloSecurityException e) {
+      throw e.asThriftException();
+    } catch (Exception ex) {
+      throw new TException(ex);
+    }
+  }
+  
+  @Override
+  public boolean isActive(TInfo tinfo, long tid) throws TException {
+    return transactionWatcher.isActive(tid);
+  }
+  
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public boolean checkClass(TInfo tinfo, TCredentials credentials, String className, String interfaceMatch) throws TException {
+    security.authenticateUser(credentials, credentials);
+    
+    ClassLoader loader = getClass().getClassLoader();
+    Class shouldMatch;
+    try {
+      shouldMatch = loader.loadClass(interfaceMatch);
+      Class test = AccumuloVFSClassLoader.loadClass(className, shouldMatch);
+      test.newInstance();
+      return true;
+    } catch (ClassCastException e) {
+      log.warn("Error checking object types", e);
+      return false;
+    } catch (ClassNotFoundException e) {
+      log.warn("Error checking object types", e);
+      return false;
+    } catch (InstantiationException e) {
+      log.warn("Error checking object types", e);
+      return false;
+    } catch (IllegalAccessException e) {
+      log.warn("Error checking object types", e);
+      return false;
+    }
+  }
+  
+  @Override
+  public boolean checkTableClass(TInfo tinfo, TCredentials credentials, String tableName, String className, String interfaceMatch) throws TException,
+      ThriftTableOperationException, ThriftSecurityException {
+    
+    security.authenticateUser(credentials, credentials);
+    
+    String tableId = checkTableId(tableName, null);
+    
+    ClassLoader loader = getClass().getClassLoader();
+    Class<?> shouldMatch;
+    try {
+      shouldMatch = loader.loadClass(interfaceMatch);
+      
+      new ServerConfiguration(instance).getTableConfiguration(tableId);
+      
+      String context = new ServerConfiguration(instance).getTableConfiguration(tableId).get(Property.TABLE_CLASSPATH);
+      
+      ClassLoader currentLoader;
+      
+      if (context != null && !context.equals("")) {
+        currentLoader = AccumuloVFSClassLoader.getContextManager().getClassLoader(context);
+      } else {
+        currentLoader = AccumuloVFSClassLoader.getClassLoader();
+      }
+      
+      Class<?> test = currentLoader.loadClass(className).asSubclass(shouldMatch);
+      test.newInstance();
+      return true;
+    } catch (Exception e) {
+      log.warn("Error checking object types", e);
+      return false;
+    }
+  }
+  
+  @Override
+  public List<TDiskUsage> getDiskUsage(Set<String> tables, TCredentials credentials) throws ThriftTableOperationException, ThriftSecurityException, TException {
+    try {
+      AuthenticationToken token = AuthenticationTokenSerializer.deserialize(credentials.getTokenClassName(), credentials.getToken());
+      Connector conn = instance.getConnector(credentials.getPrincipal(), token);
+      
+      HashSet<String> tableIds = new HashSet<String>();
+      
+      for (String table : tables) {
+        // ensure that table table exists
+        String tableId = checkTableId(table, null);
+        tableIds.add(tableId);
+        if (!security.canScan(credentials, tableId))
+          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      }
+      
+      // use the same set of tableIds that were validated above to avoid race conditions
+      Map<TreeSet<String>,Long> diskUsage = TableDiskUsage.getDiskUsage(new ServerConfiguration(instance).getConfiguration(), tableIds, fs, conn);
+      List<TDiskUsage> retUsages = new ArrayList<TDiskUsage>();
+      for (Map.Entry<TreeSet<String>,Long> usageItem : diskUsage.entrySet()) {
+        retUsages.add(new TDiskUsage(new ArrayList<String>(usageItem.getKey()), usageItem.getValue()));
+      }
+      return retUsages;
+      
+    } catch (AccumuloSecurityException e) {
+      throw e.asThriftException();
+    } catch (AccumuloException e) {
+      throw new TException(e);
+    } catch (IOException e) {
+      throw new TException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java b/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
new file mode 100644
index 0000000..9e6bbe7
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.ConnectorImpl;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * An implementation of Instance that looks in HDFS and ZooKeeper to find the master and root tablet location.
+ * 
+ */
+public class HdfsZooInstance implements Instance {
+  
+  public static class AccumuloNotInitializedException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+    
+    public AccumuloNotInitializedException(String string) {
+      super(string);
+    }
+  }
+  
+  private HdfsZooInstance() {
+    AccumuloConfiguration acuConf = ServerConfiguration.getSiteConfiguration();
+    zooCache = new ZooCache(acuConf.get(Property.INSTANCE_ZK_HOST), (int) acuConf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
+  }
+  
+  private static HdfsZooInstance cachedHdfsZooInstance = null;
+  
+  public static synchronized Instance getInstance() {
+    if (cachedHdfsZooInstance == null)
+      cachedHdfsZooInstance = new HdfsZooInstance();
+    return cachedHdfsZooInstance;
+  }
+  
+  private static ZooCache zooCache;
+  private static String instanceId = null;
+  private static final Logger log = Logger.getLogger(HdfsZooInstance.class);
+  
+  @Override
+  public String getRootTabletLocation() {
+    String zRootLocPath = ZooUtil.getRoot(this) + RootTable.ZROOT_TABLET_LOCATION;
+    
+    OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zoocache.");
+    
+    byte[] loc = zooCache.get(zRootLocPath);
+    
+    opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
+    
+    if (loc == null) {
+      return null;
+    }
+    
+    return new String(loc).split("\\|")[0];
+  }
+  
+  @Override
+  public List<String> getMasterLocations() {
+    
+    String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
+    
+    OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
+    
+    byte[] loc = ZooLock.getLockData(zooCache, masterLocPath, null);
+    
+    opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
+    
+    if (loc == null) {
+      return Collections.emptyList();
+    }
+    
+    return Collections.singletonList(new String(loc));
+  }
+  
+  @Override
+  public String getInstanceID() {
+    if (instanceId == null)
+      _getInstanceID();
+    return instanceId;
+  }
+  
+  private static synchronized void _getInstanceID() {
+    if (instanceId == null) {
+      String instanceIdFromFile = ZooUtil.getInstanceIDFromHdfs(ServerConstants.getInstanceIdLocation());
+      instanceId = instanceIdFromFile;
+    }
+  }
+  
+  @Override
+  public String getInstanceName() {
+    return ZooKeeperInstance.lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
+  }
+  
+  @Override
+  public String getZooKeepers() {
+    return ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST);
+  }
+  
+  @Override
+  public int getZooKeepersSessionTimeOut() {
+    return (int) ServerConfiguration.getSiteConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
+  }
+  
+  @Override
+  public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
+    return new ConnectorImpl(this, new Credentials(principal, token));
+  }
+  
+  @Deprecated
+  @Override
+  public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
+    return getConnector(user, new PasswordToken(pass));
+  }
+  
+  @Deprecated
+  @Override
+  public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
+    return getConnector(user, ByteBufferUtil.toBytes(pass));
+  }
+  
+  @Deprecated
+  @Override
+  public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
+    return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
+  }
+  
+  private AccumuloConfiguration conf = null;
+  
+  @Override
+  public AccumuloConfiguration getConfiguration() {
+    if (conf == null)
+      conf = new ServerConfiguration(this).getConfiguration();
+    return conf;
+  }
+  
+  @Override
+  public void setConfiguration(AccumuloConfiguration conf) {
+    this.conf = conf;
+  }
+  
+  public static void main(String[] args) {
+    Instance instance = HdfsZooInstance.getInstance();
+    System.out.println("Instance Name: " + instance.getInstanceName());
+    System.out.println("Instance ID: " + instance.getInstanceID());
+    System.out.println("ZooKeepers: " + instance.getZooKeepers());
+    System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", "));
+  }
+
+  @Override
+  public void close() throws AccumuloException {
+    try {
+      zooCache.close();
+    } catch (InterruptedException e) {
+      throw new AccumuloException("Issues closing ZooKeeper, try again");
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
new file mode 100644
index 0000000..442294f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import org.apache.accumulo.server.client.HdfsZooInstance;
+
+public class ConfigSanityCheck {
+  
+  /**
+   * @param args
+   */
+  public static void main(String[] args) {
+    new ServerConfiguration(HdfsZooInstance.getInstance()).getConfiguration();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
new file mode 100644
index 0000000..c9fd5a1
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.security.SecurityPermission;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigSanityCheck;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.KeyExtent;
+
+public class ServerConfiguration {
+  
+  private static final Map<String,TableConfiguration> tableInstances = new HashMap<String,TableConfiguration>(1);
+  private static SecurityPermission CONFIGURATION_PERMISSION = new SecurityPermission("configurationPermission");
+  
+  public static synchronized SiteConfiguration getSiteConfiguration() {
+    checkPermissions();
+    return SiteConfiguration.getInstance(getDefaultConfiguration());
+  }
+  
+  private static void checkPermissions() {
+    SecurityManager sm = System.getSecurityManager();
+    if (sm != null) {
+      sm.checkPermission(CONFIGURATION_PERMISSION);
+    }
+  }
+  
+  private static synchronized ZooConfiguration getZooConfiguration(Instance instance) {
+    checkPermissions();
+    return ZooConfiguration.getInstance(instance, getSiteConfiguration());
+  }
+  
+  public static synchronized DefaultConfiguration getDefaultConfiguration() {
+    checkPermissions();
+    return DefaultConfiguration.getInstance();
+  }
+  
+  public static synchronized AccumuloConfiguration getSystemConfiguration(Instance instance) {
+    return getZooConfiguration(instance);
+  }
+  
+  public static TableConfiguration getTableConfiguration(Instance instance, String tableId) {
+    checkPermissions();
+    synchronized (tableInstances) {
+      TableConfiguration conf = tableInstances.get(tableId);
+      if (conf == null) {
+        conf = new TableConfiguration(instance.getInstanceID(), tableId, getSystemConfiguration(instance));
+        ConfigSanityCheck.validate(conf);
+        tableInstances.put(tableId, conf);
+      }
+      return conf;
+    }
+  }
+  
+  static void removeTableIdInstance(String tableId) {
+    synchronized (tableInstances) {
+      tableInstances.remove(tableId);
+    }
+  }
+  
+  static void expireAllTableObservers() {
+    synchronized (tableInstances) {
+      for (Entry<String,TableConfiguration> entry : tableInstances.entrySet()) {
+        entry.getValue().expireAllObservers();
+      }
+    }
+  }
+  
+  private final Instance instance;
+  
+  public ServerConfiguration(Instance instance) {
+    this.instance = instance;
+  }
+  
+  public TableConfiguration getTableConfiguration(String tableId) {
+    return getTableConfiguration(instance, tableId);
+  }
+  
+  public TableConfiguration getTableConfiguration(KeyExtent extent) {
+    return getTableConfiguration(extent.getTableId().toString());
+  }
+  
+  public synchronized AccumuloConfiguration getConfiguration() {
+    return getZooConfiguration(instance);
+  }
+  
+  public Instance getInstance() {
+    return instance;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
new file mode 100644
index 0000000..c407309
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+class TableConfWatcher implements Watcher {
+  static {
+    Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN);
+    Logger.getLogger("org.apache.hadoop.io.compress").setLevel(Level.WARN);
+  }
+  
+  private static final Logger log = Logger.getLogger(TableConfWatcher.class);
+  private Instance instance = null;
+  
+  TableConfWatcher(Instance instance) {
+    this.instance = instance;
+  }
+  
+  @Override
+  public void process(WatchedEvent event) {
+    String path = event.getPath();
+    if (log.isTraceEnabled())
+      log.trace("WatchEvent : " + path + " " + event.getState() + " " + event.getType());
+    
+    String tablesPrefix = ZooUtil.getRoot(instance) + Constants.ZTABLES + "/";
+    
+    String tableId = null;
+    String key = null;
+    
+    if (path != null) {
+      if (path.startsWith(tablesPrefix)) {
+        tableId = path.substring(tablesPrefix.length());
+        if (tableId.contains("/")) {
+          tableId = tableId.substring(0, tableId.indexOf('/'));
+          if (path.startsWith(tablesPrefix + tableId + Constants.ZTABLE_CONF + "/"))
+            key = path.substring((tablesPrefix + tableId + Constants.ZTABLE_CONF + "/").length());
+        }
+      }
+      
+      if (tableId == null) {
+        log.warn("Zookeeper told me about a path I was not watching " + path + " state=" + event.getState() + " type=" + event.getType());
+        return;
+      }
+    }
+    
+    switch (event.getType()) {
+      case NodeDataChanged:
+        if (log.isTraceEnabled())
+          log.trace("EventNodeDataChanged " + event.getPath());
+        if (key != null)
+          ServerConfiguration.getTableConfiguration(instance, tableId).propertyChanged(key);
+        break;
+      case NodeChildrenChanged:
+        ServerConfiguration.getTableConfiguration(instance, tableId).propertiesChanged(key);
+        break;
+      case NodeDeleted:
+        if (key == null) {
+          // only remove the AccumuloConfiguration object when a
+          // table node is deleted, not when a tables property is
+          // deleted.
+          ServerConfiguration.removeTableIdInstance(tableId);
+        }
+        break;
+      case None:
+        switch (event.getState()) {
+          case Expired:
+            ServerConfiguration.expireAllTableObservers();
+            break;
+          case SyncConnected:
+            break;
+          case Disconnected:
+            break;
+          default:
+            log.warn("EventNone event not handled path = " + event.getPath() + " state=" + event.getState());
+        }
+        break;
+      case NodeCreated:
+        switch (event.getState()) {
+          case SyncConnected:
+            break;
+          default:
+            log.warn("Event NodeCreated event not handled path = " + event.getPath() + " state=" + event.getState());
+        }
+        break;
+      default:
+        log.warn("Event not handled path = " + event.getPath() + " state=" + event.getState() + " type = " + event.getType());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
new file mode 100644
index 0000000..4c58153
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.log4j.Logger;
+
+public class TableConfiguration extends AccumuloConfiguration {
+  private static final Logger log = Logger.getLogger(TableConfiguration.class);
+  
+  private static ZooCache tablePropCache = null;
+  private final String instanceId;
+  private final AccumuloConfiguration parent;
+  
+  private String table = null;
+  private Set<ConfigurationObserver> observers;
+  
+  public TableConfiguration(String instanceId, String table, AccumuloConfiguration parent) {
+    this.instanceId = instanceId;
+    this.table = table;
+    this.parent = parent;
+    
+    this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
+  }
+  
+  private static ZooCache getTablePropCache() {
+    Instance inst = HdfsZooInstance.getInstance();
+    if (tablePropCache == null)
+      synchronized (TableConfiguration.class) {
+        if (tablePropCache == null)
+          tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new TableConfWatcher(inst));
+      }
+    return tablePropCache;
+  }
+  
+  public void addObserver(ConfigurationObserver co) {
+    if (table == null) {
+      String err = "Attempt to add observer for non-table configuration";
+      log.error(err);
+      throw new RuntimeException(err);
+    }
+    iterator();
+    observers.add(co);
+  }
+  
+  public void removeObserver(ConfigurationObserver configObserver) {
+    if (table == null) {
+      String err = "Attempt to remove observer for non-table configuration";
+      log.error(err);
+      throw new RuntimeException(err);
+    }
+    observers.remove(configObserver);
+  }
+  
+  public void expireAllObservers() {
+    Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+    for (ConfigurationObserver co : copy)
+      co.sessionExpired();
+  }
+  
+  public void propertyChanged(String key) {
+    Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+    for (ConfigurationObserver co : copy)
+      co.propertyChanged(key);
+  }
+  
+  public void propertiesChanged(String key) {
+    Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+    for (ConfigurationObserver co : copy)
+      co.propertiesChanged();
+  }
+  
+  @Override
+  public String get(Property property) {
+    String key = property.getKey();
+    String value = get(key);
+    
+    if (value == null || !property.getType().isValidFormat(value)) {
+      if (value != null)
+        log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
+      value = parent.get(property);
+    }
+    return value;
+  }
+  
+  private String get(String key) {
+    String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF + "/" + key;
+    byte[] v = getTablePropCache().get(zPath);
+    String value = null;
+    if (v != null)
+      value = new String(v);
+    return value;
+  }
+  
+  @Override
+  public Iterator<Entry<String,String>> iterator() {
+    TreeMap<String,String> entries = new TreeMap<String,String>();
+    
+    for (Entry<String,String> parentEntry : parent)
+      entries.put(parentEntry.getKey(), parentEntry.getValue());
+    
+    List<String> children = getTablePropCache().getChildren(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF);
+    if (children != null) {
+      for (String child : children) {
+        String value = get(child);
+        if (child != null && value != null)
+          entries.put(child, value);
+      }
+    }
+    
+    return entries.entrySet().iterator();
+  }
+  
+  public String getTableId() {
+    return table;
+  }
+}


Mime
View raw message