accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [4/9] accumulo git commit: ACCUMULO-3759 Fix Java 8 compiler warnings
Date Wed, 29 Apr 2015 01:03:41 GMT
ACCUMULO-3759 Fix Java 8 compiler warnings

* Add missing hashCode in class with equals
* Enforce one-type per file


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6e2e6780
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6e2e6780
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6e2e6780

Branch: refs/heads/1.7
Commit: 6e2e6780fc59c86112fba30a5211081bb6e77979
Parents: f996387
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Tue Apr 28 20:30:22 2015 -0400
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Tue Apr 28 20:30:22 2015 -0400

----------------------------------------------------------------------
 .../core/client/impl/OfflineIterator.java       | 340 ++++++++++++
 .../core/client/impl/OfflineScanner.java        | 314 -----------
 .../core/compaction/CompactionSettings.java     |  42 --
 .../accumulo/core/compaction/PatternType.java   |  28 +
 .../accumulo/core/compaction/SizeType.java      |  30 ++
 .../accumulo/core/compaction/StringType.java    |  24 +
 .../apache/accumulo/core/compaction/Type.java   |  21 +
 .../accumulo/core/compaction/UIntType.java      |  27 +
 .../core/file/DispatchingFileFactory.java       | 136 +++++
 .../accumulo/core/file/FileOperations.java      | 106 ----
 .../accumulo/core/cli/TestClientOpts.java       |   5 +
 .../client/CountingVerifyingReceiver.java       |  64 +++
 .../simple/client/RandomBatchScanner.java       |  38 --
 pom.xml                                         |   1 +
 .../accumulo/master/tableOps/BulkImport.java    | 363 -------------
 .../master/tableOps/CancelCompactions.java      |  23 -
 .../accumulo/master/tableOps/ChooseDir.java     |  53 ++
 .../accumulo/master/tableOps/CleanUp.java       | 287 ++++++++++
 .../master/tableOps/CleanUpBulkImport.java      |  64 +++
 .../accumulo/master/tableOps/CloneInfo.java     |  36 ++
 .../accumulo/master/tableOps/CloneMetadata.java |  54 ++
 .../master/tableOps/ClonePermissions.java       |  73 +++
 .../accumulo/master/tableOps/CloneTable.java    | 195 -------
 .../master/tableOps/CloneZookeeper.java         |  76 +++
 .../accumulo/master/tableOps/CompactRange.java  | 159 ------
 .../master/tableOps/CompactionDriver.java       | 188 +++++++
 .../master/tableOps/CompleteBulkImport.java     |  45 ++
 .../accumulo/master/tableOps/CopyFailed.java    | 158 ++++++
 .../accumulo/master/tableOps/CreateDir.java     |  51 ++
 .../master/tableOps/CreateImportDir.java        |  61 +++
 .../master/tableOps/CreateNamespace.java        | 137 -----
 .../accumulo/master/tableOps/CreateTable.java   | 251 ---------
 .../master/tableOps/DeleteNamespace.java        |  55 --
 .../accumulo/master/tableOps/DeleteTable.java   | 265 ----------
 .../accumulo/master/tableOps/ExportInfo.java    |  29 ++
 .../accumulo/master/tableOps/ExportTable.java   | 257 ---------
 .../master/tableOps/FinishCancelCompaction.java |  40 ++
 .../master/tableOps/FinishCloneTable.java       |  64 +++
 .../master/tableOps/FinishCreateNamespace.java  |  58 +++
 .../master/tableOps/FinishCreateTable.java      |  62 +++
 .../master/tableOps/FinishImportTable.java      |  68 +++
 .../tableOps/ImportPopulateZookeeper.java       | 104 ++++
 .../master/tableOps/ImportSetupPermissions.java |  65 +++
 .../accumulo/master/tableOps/ImportTable.java   | 521 -------------------
 .../master/tableOps/ImportedTableInfo.java      |  31 ++
 .../accumulo/master/tableOps/LoadFiles.java     | 209 ++++++++
 .../master/tableOps/MapImportFileNames.java     | 111 ++++
 .../master/tableOps/MoveExportedFiles.java      |  71 +++
 .../master/tableOps/NamespaceCleanUp.java       |  75 +++
 .../accumulo/master/tableOps/NamespaceInfo.java |  31 ++
 .../master/tableOps/PopulateMetadata.java       |  54 ++
 .../master/tableOps/PopulateMetadataTable.java  | 217 ++++++++
 .../master/tableOps/PopulateZookeeper.java      |  77 +++
 .../PopulateZookeeperWithNamespace.java         |  74 +++
 .../tableOps/SetupNamespacePermissions.java     |  55 ++
 .../master/tableOps/SetupPermissions.java       |  63 +++
 .../accumulo/master/tableOps/TableInfo.java     |  35 ++
 .../accumulo/master/tableOps/TableRangeOp.java  |  45 --
 .../master/tableOps/TableRangeOpWait.java       |  69 +++
 .../master/tableOps/WriteExportFiles.java       | 268 ++++++++++
 .../apache/accumulo/tserver/InMemoryMap.java    | 119 -----
 .../accumulo/tserver/MemKeyComparator.java      |  44 ++
 .../tserver/MemKeyConversionIterator.java       |  96 ++++
 .../PartialMutationSkippingIterator.java        |  54 ++
 .../accumulo/test/EstimateInMemMapOverhead.java | 317 -----------
 .../test/InMemoryMapMemoryUsageTest.java        | 102 ++++
 .../accumulo/test/IntObjectMemoryUsageTest.java |  65 +++
 .../apache/accumulo/test/MemoryUsageTest.java   |  64 +++
 .../accumulo/test/MutationMemoryUsageTest.java  |  98 ++++
 .../accumulo/test/TextMemoryUsageTest.java      |  82 +++
 .../accumulo/test/continuous/HistData.java      |  49 ++
 .../accumulo/test/continuous/Histogram.java     |  30 --
 72 files changed, 4406 insertions(+), 3237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
new file mode 100644
index 0000000..b035e3e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.VolumeConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+class OfflineIterator implements Iterator<Entry<Key,Value>> {
+
+  static class OfflineIteratorEnvironment implements IteratorEnvironment {
+
+    private final Authorizations authorizations;
+
+    public OfflineIteratorEnvironment(Authorizations auths) {
+      this.authorizations = auths;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public AccumuloConfiguration getConfig() {
+      return AccumuloConfiguration.getDefaultConfiguration();
+    }
+
+    @Override
+    public IteratorScope getIteratorScope() {
+      return IteratorScope.scan;
+    }
+
+    @Override
+    public boolean isFullMajorCompaction() {
+      return false;
+    }
+
+    private ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>();
+
+    @Override
+    public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
+      topLevelIterators.add(iter);
+    }
+
+    @Override
+    public Authorizations getAuthorizations() {
+      return authorizations;
+    }
+
+    SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
+      if (topLevelIterators.isEmpty())
+        return iter;
+      ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators);
+      allIters.add(iter);
+      return new MultiIterator(allIters, false);
+    }
+  }
+
+  private SortedKeyValueIterator<Key,Value> iter;
+  private Range range;
+  private KeyExtent currentExtent;
+  private Connector conn;
+  private String tableId;
+  private Authorizations authorizations;
+  private Instance instance;
+  private ScannerOptions options;
+  private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
+  private AccumuloConfiguration config;
+
+  public OfflineIterator(ScannerOptions options, Instance instance, Credentials credentials, Authorizations authorizations, Text table, Range range) {
+    this.options = new ScannerOptions(options);
+    this.instance = instance;
+    this.range = range;
+
+    if (this.options.fetchedColumns.size() > 0) {
+      this.range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
+    }
+
+    this.tableId = table.toString();
+    this.authorizations = authorizations;
+    this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
+
+    try {
+      conn = instance.getConnector(credentials.getPrincipal(), credentials.getToken());
+      config = new ConfigurationCopy(conn.instanceOperations().getSiteConfiguration());
+      nextTablet();
+
+      while (iter != null && !iter.hasTop())
+        nextTablet();
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return iter != null && iter.hasTop();
+  }
+
+  @Override
+  public Entry<Key,Value> next() {
+    try {
+      byte[] v = iter.getTopValue().get();
+      // copy just like tablet server does, do this before calling next
+      KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length));
+
+      iter.next();
+
+      while (iter != null && !iter.hasTop())
+        nextTablet();
+
+      return ret;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void nextTablet() throws TableNotFoundException, AccumuloException, IOException {
+
+    Range nextRange = null;
+
+    if (currentExtent == null) {
+      Text startRow;
+
+      if (range.getStartKey() != null)
+        startRow = range.getStartKey().getRow();
+      else
+        startRow = new Text();
+
+      nextRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
+    } else {
+
+      if (currentExtent.getEndRow() == null) {
+        iter = null;
+        return;
+      }
+
+      if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW))) {
+        iter = null;
+        return;
+      }
+
+      nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false);
+    }
+
+    List<String> relFiles = new ArrayList<String>();
+
+    Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
+
+    while (eloc.getSecond() != null) {
+      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+        Tables.clearCache(instance);
+        if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+          throw new AccumuloException("Table is online " + tableId + " cannot scan tablet in offline mode " + eloc.getFirst());
+        }
+      }
+
+      UtilWaitThread.sleep(250);
+
+      eloc = getTabletFiles(nextRange, relFiles);
+    }
+
+    KeyExtent extent = eloc.getFirst();
+
+    if (!extent.getTableId().toString().equals(tableId)) {
+      throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent);
+    }
+
+    if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
+      throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
+
+    // Old property is only used to resolve relative paths into absolute paths. For systems upgraded
+    // with relative paths, it's assumed that correct instance.dfs.{uri,dir} is still correct in the configuration
+    @SuppressWarnings("deprecation")
+    String tablesDir = config.get(Property.INSTANCE_DFS_DIR) + Constants.HDFS_TABLES_DIR;
+
+    List<String> absFiles = new ArrayList<String>();
+    for (String relPath : relFiles) {
+      if (relPath.contains(":")) {
+        absFiles.add(relPath);
+      } else {
+        // handle old-style relative paths
+        if (relPath.startsWith("..")) {
+          absFiles.add(tablesDir + relPath.substring(2));
+        } else {
+          absFiles.add(tablesDir + "/" + tableId + relPath);
+        }
+      }
+    }
+
+    iter = createIterator(extent, absFiles);
+    iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns), options.fetchedColumns.size() == 0 ? false : true);
+    currentExtent = extent;
+
+  }
+
+  private Pair<KeyExtent,String> getTabletFiles(Range nextRange, List<String> relFiles) throws TableNotFoundException {
+    Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    scanner.setBatchSize(100);
+    scanner.setRange(nextRange);
+
+    RowIterator rowIter = new RowIterator(scanner);
+    Iterator<Entry<Key,Value>> row = rowIter.next();
+
+    KeyExtent extent = null;
+    String location = null;
+
+    while (row.hasNext()) {
+      Entry<Key,Value> entry = row.next();
+      Key key = entry.getKey();
+
+      if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+        relFiles.add(key.getColumnQualifier().toString());
+      }
+
+      if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
+          || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
+        location = entry.getValue().toString();
+      }
+
+      if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
+        extent = new KeyExtent(key.getRow(), entry.getValue());
+      }
+
+    }
+    return new Pair<KeyExtent,String>(extent, location);
+  }
+
+  private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<String> absFiles) throws TableNotFoundException, AccumuloException,
+      IOException {
+
+    // TODO share code w/ tablet - ACCUMULO-1303
+    AccumuloConfiguration acuTableConf = AccumuloConfiguration.getTableConfiguration(conn, tableId);
+
+    Configuration conf = CachedConfiguration.getInstance();
+
+    for (SortedKeyValueIterator<Key,Value> reader : readers) {
+      ((FileSKVIterator) reader).close();
+    }
+
+    readers.clear();
+
+    // TODO need to close files - ACCUMULO-1303
+    for (String file : absFiles) {
+      FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem();
+      FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null);
+      readers.add(reader);
+    }
+
+    MultiIterator multiIter = new MultiIterator(readers, extent);
+
+    OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations);
+
+    DeletingIterator delIter = new DeletingIterator(multiIter, false);
+
+    ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+
+    ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns));
+
+    byte[] defaultSecurityLabel;
+
+    ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
+    defaultSecurityLabel = cv.getExpression();
+
+    VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel);
+
+    return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList,
+        options.serverSideIteratorOptions, iterEnv, false));
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
index 2f31319..427a7cc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
@@ -18,332 +18,18 @@ package org.apache.accumulo.core.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyValue;
-import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.IteratorUtil;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
-import org.apache.accumulo.core.iterators.system.DeletingIterator;
-import org.apache.accumulo.core.iterators.system.MultiIterator;
-import org.apache.accumulo.core.iterators.system.VisibilityFilter;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.LocalityGroupUtil;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.volume.VolumeConfiguration;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 
-class OfflineIterator implements Iterator<Entry<Key,Value>> {
-
-  static class OfflineIteratorEnvironment implements IteratorEnvironment {
-
-    private final Authorizations authorizations;
-
-    public OfflineIteratorEnvironment(Authorizations auths) {
-      this.authorizations = auths;
-    }
-
-    @Override
-    public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
-      throw new NotImplementedException();
-    }
-
-    @Override
-    public AccumuloConfiguration getConfig() {
-      return AccumuloConfiguration.getDefaultConfiguration();
-    }
-
-    @Override
-    public IteratorScope getIteratorScope() {
-      return IteratorScope.scan;
-    }
-
-    @Override
-    public boolean isFullMajorCompaction() {
-      return false;
-    }
-
-    private ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>();
-
-    @Override
-    public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
-      topLevelIterators.add(iter);
-    }
-
-    @Override
-    public Authorizations getAuthorizations() {
-      return authorizations;
-    }
-
-    SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
-      if (topLevelIterators.isEmpty())
-        return iter;
-      ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators);
-      allIters.add(iter);
-      return new MultiIterator(allIters, false);
-    }
-  }
-
-  private SortedKeyValueIterator<Key,Value> iter;
-  private Range range;
-  private KeyExtent currentExtent;
-  private Connector conn;
-  private String tableId;
-  private Authorizations authorizations;
-  private Instance instance;
-  private ScannerOptions options;
-  private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
-  private AccumuloConfiguration config;
-
-  public OfflineIterator(ScannerOptions options, Instance instance, Credentials credentials, Authorizations authorizations, Text table, Range range) {
-    this.options = new ScannerOptions(options);
-    this.instance = instance;
-    this.range = range;
-
-    if (this.options.fetchedColumns.size() > 0) {
-      this.range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
-    }
-
-    this.tableId = table.toString();
-    this.authorizations = authorizations;
-    this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
-
-    try {
-      conn = instance.getConnector(credentials.getPrincipal(), credentials.getToken());
-      config = new ConfigurationCopy(conn.instanceOperations().getSiteConfiguration());
-      nextTablet();
-
-      while (iter != null && !iter.hasTop())
-        nextTablet();
-
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public boolean hasNext() {
-    return iter != null && iter.hasTop();
-  }
-
-  @Override
-  public Entry<Key,Value> next() {
-    try {
-      byte[] v = iter.getTopValue().get();
-      // copy just like tablet server does, do this before calling next
-      KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length));
-
-      iter.next();
-
-      while (iter != null && !iter.hasTop())
-        nextTablet();
-
-      return ret;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void nextTablet() throws TableNotFoundException, AccumuloException, IOException {
-
-    Range nextRange = null;
-
-    if (currentExtent == null) {
-      Text startRow;
-
-      if (range.getStartKey() != null)
-        startRow = range.getStartKey().getRow();
-      else
-        startRow = new Text();
-
-      nextRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
-    } else {
-
-      if (currentExtent.getEndRow() == null) {
-        iter = null;
-        return;
-      }
-
-      if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW))) {
-        iter = null;
-        return;
-      }
-
-      nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false);
-    }
-
-    List<String> relFiles = new ArrayList<String>();
-
-    Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
-
-    while (eloc.getSecond() != null) {
-      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
-        Tables.clearCache(instance);
-        if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
-          throw new AccumuloException("Table is online " + tableId + " cannot scan tablet in offline mode " + eloc.getFirst());
-        }
-      }
-
-      UtilWaitThread.sleep(250);
-
-      eloc = getTabletFiles(nextRange, relFiles);
-    }
-
-    KeyExtent extent = eloc.getFirst();
-
-    if (!extent.getTableId().toString().equals(tableId)) {
-      throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent);
-    }
-
-    if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
-      throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
-
-    // Old property is only used to resolve relative paths into absolute paths. For systems upgraded
-    // with relative paths, it's assumed that correct instance.dfs.{uri,dir} is still correct in the configuration
-    @SuppressWarnings("deprecation")
-    String tablesDir = config.get(Property.INSTANCE_DFS_DIR) + Constants.HDFS_TABLES_DIR;
-
-    List<String> absFiles = new ArrayList<String>();
-    for (String relPath : relFiles) {
-      if (relPath.contains(":")) {
-        absFiles.add(relPath);
-      } else {
-        // handle old-style relative paths
-        if (relPath.startsWith("..")) {
-          absFiles.add(tablesDir + relPath.substring(2));
-        } else {
-          absFiles.add(tablesDir + "/" + tableId + relPath);
-        }
-      }
-    }
-
-    iter = createIterator(extent, absFiles);
-    iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns), options.fetchedColumns.size() == 0 ? false : true);
-    currentExtent = extent;
-
-  }
-
-  private Pair<KeyExtent,String> getTabletFiles(Range nextRange, List<String> relFiles) throws TableNotFoundException {
-    Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    scanner.setBatchSize(100);
-    scanner.setRange(nextRange);
-
-    RowIterator rowIter = new RowIterator(scanner);
-    Iterator<Entry<Key,Value>> row = rowIter.next();
-
-    KeyExtent extent = null;
-    String location = null;
-
-    while (row.hasNext()) {
-      Entry<Key,Value> entry = row.next();
-      Key key = entry.getKey();
-
-      if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
-        relFiles.add(key.getColumnQualifier().toString());
-      }
-
-      if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
-          || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
-        location = entry.getValue().toString();
-      }
-
-      if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
-        extent = new KeyExtent(key.getRow(), entry.getValue());
-      }
-
-    }
-    return new Pair<KeyExtent,String>(extent, location);
-  }
-
-  private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<String> absFiles) throws TableNotFoundException, AccumuloException,
-      IOException {
-
-    // TODO share code w/ tablet - ACCUMULO-1303
-    AccumuloConfiguration acuTableConf = AccumuloConfiguration.getTableConfiguration(conn, tableId);
-
-    Configuration conf = CachedConfiguration.getInstance();
-
-    for (SortedKeyValueIterator<Key,Value> reader : readers) {
-      ((FileSKVIterator) reader).close();
-    }
-
-    readers.clear();
-
-    // TODO need to close files - ACCUMULO-1303
-    for (String file : absFiles) {
-      FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem();
-      FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null);
-      readers.add(reader);
-    }
-
-    MultiIterator multiIter = new MultiIterator(readers, extent);
-
-    OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations);
-
-    DeletingIterator delIter = new DeletingIterator(multiIter, false);
-
-    ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
-
-    ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns));
-
-    byte[] defaultSecurityLabel;
-
-    ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
-    defaultSecurityLabel = cv.getExpression();
-
-    VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel);
-
-    return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList,
-        options.serverSideIteratorOptions, iterEnv, false));
-  }
-
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
-  }
-
-}
-
-/**
- *
- */
 public class OfflineScanner extends ScannerOptions implements Scanner {
 
   private int batchSize;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
index a45a692..43f8c0f 100644
--- a/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
@@ -18,48 +18,6 @@
 package org.apache.accumulo.core.compaction;
 
 import java.util.Map;
-import java.util.regex.Pattern;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-
-import com.google.common.base.Preconditions;
-
-interface Type {
-  String convert(String str);
-}
-
-class SizeType implements Type {
-  @Override
-  public String convert(String str) {
-    long size = AccumuloConfiguration.getMemoryInBytes(str);
-    Preconditions.checkArgument(size > 0);
-    return Long.toString(size);
-  }
-}
-
-class PatternType implements Type {
-  @Override
-  public String convert(String str) {
-    // ensure it compiles
-    Pattern.compile(str);
-    return str;
-  }
-}
-
-class UIntType implements Type {
-  @Override
-  public String convert(String str) {
-    Preconditions.checkArgument(Integer.parseInt(str) > 0);
-    return str;
-  }
-}
-
-class StringType implements Type {
-  @Override
-  public String convert(String str) {
-    return str;
-  }
-}
 
 public enum CompactionSettings {
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java b/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java
new file mode 100644
index 0000000..c52dcb4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.compaction;
+
+import java.util.regex.Pattern;
+
+class PatternType implements Type {
+  @Override
+  public String convert(String str) {
+    // ensure it compiles
+    Pattern.compile(str);
+    return str;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java b/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java
new file mode 100644
index 0000000..c2af401
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.compaction;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+
+import com.google.common.base.Preconditions;
+
+class SizeType implements Type {
+  @Override
+  public String convert(String str) {
+    long size = AccumuloConfiguration.getMemoryInBytes(str);
+    Preconditions.checkArgument(size > 0);
+    return Long.toString(size);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java b/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java
new file mode 100644
index 0000000..7098a5c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.compaction;
+
+class StringType implements Type {
+  @Override
+  public String convert(String str) {
+    return str;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/Type.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/Type.java b/core/src/main/java/org/apache/accumulo/core/compaction/Type.java
new file mode 100644
index 0000000..d8f81a6
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/Type.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.compaction;
+
+interface Type {
+  String convert(String str);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java b/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java
new file mode 100644
index 0000000..c8880fc
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.compaction;
+
+import com.google.common.base.Preconditions;
+
+class UIntType implements Type {
+  @Override
+  public String convert(String str) {
+    Preconditions.checkArgument(Integer.parseInt(str) > 0);
+    return str;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
new file mode 100644
index 0000000..128a931
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.map.MapFileOperations;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+class DispatchingFileFactory extends FileOperations {
+
+  private FileOperations findFileFactory(String file) {
+
+    Path p = new Path(file);
+    String name = p.getName();
+
+    if (name.startsWith(Constants.MAPFILE_EXTENSION + "_")) {
+      return new MapFileOperations();
+    }
+    String[] sp = name.split("\\.");
+
+    if (sp.length < 2) {
+      throw new IllegalArgumentException("File name " + name + " has no extension");
+    }
+
+    String extension = sp[sp.length - 1];
+
+    if (extension.equals(Constants.MAPFILE_EXTENSION) || extension.equals(Constants.MAPFILE_EXTENSION + "_tmp")) {
+      return new MapFileOperations();
+    } else if (extension.equals(RFile.EXTENSION) || extension.equals(RFile.EXTENSION + "_tmp")) {
+      return new RFileOperations();
+    } else {
+      throw new IllegalArgumentException("File type " + extension + " not supported");
+    }
+  }
+
+  @Override
+  public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    return findFileFactory(file).openIndex(file, fs, conf, acuconf, null, null);
+  }
+
+  @Override
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
+    if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
+      return new BloomFilterLayer.Reader(iter, acuconf);
+    }
+    return iter;
+  }
+
+  @Override
+  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, acuconf);
+    if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
+      return new BloomFilterLayer.Writer(writer, acuconf);
+    }
+    return writer;
+  }
+
+  @Override
+  public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    return findFileFactory(file).getFileSize(file, fs, conf, acuconf);
+  }
+
+  @Override
+  public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
+      AccumuloConfiguration tableConf) throws IOException {
+    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, null, null);
+  }
+
+  @Override
+  public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
+      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+
+    if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
+      indexCache = null;
+    if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
+      dataCache = null;
+
+    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, dataCache, indexCache);
+  }
+
+  @Override
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
+      BlockCache dataCache, BlockCache indexCache) throws IOException {
+
+    if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
+      indexCache = null;
+    if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
+      dataCache = null;
+
+    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, dataCache, indexCache);
+    if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
+      return new BloomFilterLayer.Reader(iter, acuconf);
+    }
+    return iter;
+  }
+
+  @Override
+  public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache)
+      throws IOException {
+
+    if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
+      iCache = null;
+    if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
+      dCache = null;
+
+    return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 78d0407..3798453 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -27,115 +27,9 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
-import org.apache.accumulo.core.file.map.MapFileOperations;
 import org.apache.accumulo.core.file.rfile.RFile;
-import org.apache.accumulo.core.file.rfile.RFileOperations;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-class DispatchingFileFactory extends FileOperations {
-
-  private FileOperations findFileFactory(String file) {
-
-    Path p = new Path(file);
-    String name = p.getName();
-
-    if (name.startsWith(Constants.MAPFILE_EXTENSION + "_")) {
-      return new MapFileOperations();
-    }
-    String[] sp = name.split("\\.");
-
-    if (sp.length < 2) {
-      throw new IllegalArgumentException("File name " + name + " has no extension");
-    }
-
-    String extension = sp[sp.length - 1];
-
-    if (extension.equals(Constants.MAPFILE_EXTENSION) || extension.equals(Constants.MAPFILE_EXTENSION + "_tmp")) {
-      return new MapFileOperations();
-    } else if (extension.equals(RFile.EXTENSION) || extension.equals(RFile.EXTENSION + "_tmp")) {
-      return new RFileOperations();
-    } else {
-      throw new IllegalArgumentException("File type " + extension + " not supported");
-    }
-  }
-
-  @Override
-  public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    return findFileFactory(file).openIndex(file, fs, conf, acuconf, null, null);
-  }
-
-  @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
-    if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
-      return new BloomFilterLayer.Reader(iter, acuconf);
-    }
-    return iter;
-  }
-
-  @Override
-  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, acuconf);
-    if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
-      return new BloomFilterLayer.Writer(writer, acuconf);
-    }
-    return writer;
-  }
-
-  @Override
-  public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    return findFileFactory(file).getFileSize(file, fs, conf, acuconf);
-  }
-
-  @Override
-  public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf) throws IOException {
-    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, null, null);
-  }
-
-  @Override
-  public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
-
-    if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
-      indexCache = null;
-    if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
-      dataCache = null;
-
-    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, dataCache, indexCache);
-  }
-
-  @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
-      BlockCache dataCache, BlockCache indexCache) throws IOException {
-
-    if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
-      indexCache = null;
-    if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
-      dataCache = null;
-
-    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, dataCache, indexCache);
-    if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
-      return new BloomFilterLayer.Reader(iter, acuconf);
-    }
-    return iter;
-  }
-
-  @Override
-  public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache)
-      throws IOException {
-
-    if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
-      iCache = null;
-    if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
-      dCache = null;
-
-    return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache);
-  }
-
-}
 
 public abstract class FileOperations {
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
index f0fdcca..65df5c9 100644
--- a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
+++ b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
@@ -263,5 +263,10 @@ public class TestClientOpts {
     public boolean equals(Object o) {
       return o instanceof EmptyToken;
     }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java
new file mode 100644
index 0000000..873f886
--- /dev/null
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.client;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal class used to verify validity of data read.
+ */
+class CountingVerifyingReceiver {
+  private static final Logger log = LoggerFactory.getLogger(CountingVerifyingReceiver.class);
+
+  long count = 0;
+  int expectedValueSize = 0;
+  HashMap<Text,Boolean> expectedRows;
+
+  CountingVerifyingReceiver(HashMap<Text,Boolean> expectedRows, int expectedValueSize) {
+    this.expectedRows = expectedRows;
+    this.expectedValueSize = expectedValueSize;
+  }
+
+  public void receive(Key key, Value value) {
+
+    String row = key.getRow().toString();
+    long rowid = Integer.parseInt(row.split("_")[1]);
+
+    byte expectedValue[] = RandomBatchWriter.createValue(rowid, expectedValueSize);
+
+    if (!Arrays.equals(expectedValue, value.get())) {
+      log.error("Got unexpected value for " + key + " expected : " + new String(expectedValue, UTF_8) + " got : " + new String(value.get(), UTF_8));
+    }
+
+    if (!expectedRows.containsKey(key.getRow())) {
+      log.error("Got unexpected key " + key);
+    } else {
+      expectedRows.put(key.getRow(), true);
+    }
+
+    count++;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java
index 6f8b485..a43b97d 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java
@@ -16,10 +16,8 @@
  */
 package org.apache.accumulo.examples.simple.client;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.examples.simple.client.RandomBatchWriter.abs;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map.Entry;
@@ -43,42 +41,6 @@ import org.slf4j.LoggerFactory;
 import com.beust.jcommander.Parameter;
 
 /**
- * Internal class used to verify validity of data read.
- */
-class CountingVerifyingReceiver {
-  private static final Logger log = LoggerFactory.getLogger(CountingVerifyingReceiver.class);
-
-  long count = 0;
-  int expectedValueSize = 0;
-  HashMap<Text,Boolean> expectedRows;
-
-  CountingVerifyingReceiver(HashMap<Text,Boolean> expectedRows, int expectedValueSize) {
-    this.expectedRows = expectedRows;
-    this.expectedValueSize = expectedValueSize;
-  }
-
-  public void receive(Key key, Value value) {
-
-    String row = key.getRow().toString();
-    long rowid = Integer.parseInt(row.split("_")[1]);
-
-    byte expectedValue[] = RandomBatchWriter.createValue(rowid, expectedValueSize);
-
-    if (!Arrays.equals(expectedValue, value.get())) {
-      log.error("Got unexpected value for " + key + " expected : " + new String(expectedValue, UTF_8) + " got : " + new String(value.get(), UTF_8));
-    }
-
-    if (!expectedRows.containsKey(key.getRow())) {
-      log.error("Got unexpected key " + key);
-    } else {
-      expectedRows.put(key.getRow(), true);
-    }
-
-    count++;
-  }
-}
-
-/**
  * Simple example for reading random batches of data from Accumulo. See docs/examples/README.batch for instructions.
  */
 public class RandomBatchScanner {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0bcc689..f680f84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -946,6 +946,7 @@
                 <property name="eachLine" value="true" />
               </module>
               <module name="TreeWalker">
+                <module name="OneTopLevelClass" />
                 <module name="RegexpSinglelineJava">
                   <property name="format" value="\s+$" />
                   <property name="message" value="Line has trailing whitespace." />

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 7f83988..031a80c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -16,71 +16,34 @@
  */
 package org.apache.accumulo.master.tableOps;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.impl.ServerClient;
 import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.client.impl.thrift.ClientService;
-import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
-import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tablets.UniqueNameAllocator;
 import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.Text;
-import org.apache.htrace.wrappers.TraceExecutorService;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -302,329 +265,3 @@ public class BulkImport extends MasterRepo {
     Utils.getReadLock(tableId, tid).unlock();
   }
 }
-
-class CleanUpBulkImport extends MasterRepo {
-
-  private static final long serialVersionUID = 1L;
-
-  private static final Logger log = LoggerFactory.getLogger(CleanUpBulkImport.class);
-
-  private String tableId;
-  private String source;
-  private String bulk;
-  private String error;
-
-  public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
-    this.tableId = tableId;
-    this.source = source;
-    this.bulk = bulk;
-    this.error = error;
-  }
-
-  @Override
-  public Repo<Master> call(long tid, Master master) throws Exception {
-    log.debug("removing the bulk processing flag file in " + bulk);
-    Path bulkDir = new Path(bulk);
-    MetadataTableUtil.removeBulkLoadInProgressFlag(master, "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
-    MetadataTableUtil.addDeleteEntry(master, tableId, bulkDir.toString());
-    log.debug("removing the metadata table markers for loaded files");
-    Connector conn = master.getConnector();
-    MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
-    log.debug("releasing HDFS reservations for " + source + " and " + error);
-    Utils.unreserveHdfsDirectory(source, tid);
-    Utils.unreserveHdfsDirectory(error, tid);
-    Utils.getReadLock(tableId, tid).unlock();
-    log.debug("completing bulk import transaction " + tid);
-    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
-    return null;
-  }
-}
-
-class CompleteBulkImport extends MasterRepo {
-
-  private static final long serialVersionUID = 1L;
-
-  private String tableId;
-  private String source;
-  private String bulk;
-  private String error;
-
-  public CompleteBulkImport(String tableId, String source, String bulk, String error) {
-    this.tableId = tableId;
-    this.source = source;
-    this.bulk = bulk;
-    this.error = error;
-  }
-
-  @Override
-  public Repo<Master> call(long tid, Master master) throws Exception {
-    ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
-    return new CopyFailed(tableId, source, bulk, error);
-  }
-}
-
-class CopyFailed extends MasterRepo {
-
-  private static final long serialVersionUID = 1L;
-
-  private String tableId;
-  private String source;
-  private String bulk;
-  private String error;
-
-  public CopyFailed(String tableId, String source, String bulk, String error) {
-    this.tableId = tableId;
-    this.source = source;
-    this.bulk = bulk;
-    this.error = error;
-  }
-
-  @Override
-  public long isReady(long tid, Master master) throws Exception {
-    Set<TServerInstance> finished = new HashSet<TServerInstance>();
-    Set<TServerInstance> running = master.onlineTabletServers();
-    for (TServerInstance server : running) {
-      try {
-        TServerConnection client = master.getConnection(server);
-        if (client != null && !client.isActive(tid))
-          finished.add(server);
-      } catch (TException ex) {
-        log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
-      }
-    }
-    if (finished.containsAll(running))
-      return 0;
-    return 500;
-  }
-
-  @Override
-  public Repo<Master> call(long tid, Master master) throws Exception {
-    // This needs to execute after the arbiter is stopped
-
-    VolumeManager fs = master.getFileSystem();
-
-    if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
-      return new CleanUpBulkImport(tableId, source, bulk, error);
-
-    HashMap<FileRef,String> failures = new HashMap<FileRef,String>();
-    HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>();
-
-    try (BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(error, BulkImport.FAILURES_TXT)), UTF_8))) {
-      String line = null;
-      while ((line = in.readLine()) != null) {
-        Path path = new Path(line);
-        if (!fs.exists(new Path(error, path.getName())))
-          failures.put(new FileRef(line, path), line);
-      }
-    }
-
-    /*
-     * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
-     * have no loaded markers.
-     */
-
-    // determine which failed files were loaded
-    Connector conn = master.getConnector();
-    Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
-    mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
-    mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-
-    for (Entry<Key,Value> entry : mscanner) {
-      if (Long.parseLong(entry.getValue().toString()) == tid) {
-        FileRef loadedFile = new FileRef(fs, entry.getKey());
-        String absPath = failures.remove(loadedFile);
-        if (absPath != null) {
-          loadedFailures.put(loadedFile, absPath);
-        }
-      }
-    }
-
-    // move failed files that were not loaded
-    for (String failure : failures.values()) {
-      Path orig = new Path(failure);
-      Path dest = new Path(error, orig.getName());
-      fs.rename(orig, dest);
-      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
-    }
-
-    if (loadedFailures.size() > 0) {
-      DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + master.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ,
-          master.getConfiguration());
-
-      HashSet<String> workIds = new HashSet<String>();
-
-      for (String failure : loadedFailures.values()) {
-        Path orig = new Path(failure);
-        Path dest = new Path(error, orig.getName());
-
-        if (fs.exists(dest))
-          continue;
-
-        bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8));
-        workIds.add(orig.getName());
-        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
-      }
-
-      bifCopyQueue.waitUntilDone(workIds);
-    }
-
-    fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
-    return new CleanUpBulkImport(tableId, source, bulk, error);
-  }
-
-}
-
-class LoadFiles extends MasterRepo {
-
-  private static final long serialVersionUID = 1L;
-
-  private static ExecutorService threadPool = null;
-  private static final Logger log = LoggerFactory.getLogger(BulkImport.class);
-
-  private String tableId;
-  private String source;
-  private String bulk;
-  private String errorDir;
-  private boolean setTime;
-
-  public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
-    this.tableId = tableId;
-    this.source = source;
-    this.bulk = bulk;
-    this.errorDir = errorDir;
-    this.setTime = setTime;
-  }
-
-  @Override
-  public long isReady(long tid, Master master) throws Exception {
-    if (master.onlineTabletServers().size() == 0)
-      return 500;
-    return 0;
-  }
-
-  private static synchronized ExecutorService getThreadPool(Master master) {
-    if (threadPool == null) {
-      int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
-      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
-      pool.allowCoreThreadTimeOut(true);
-      threadPool = new TraceExecutorService(pool);
-    }
-    return threadPool;
-  }
-
-  @Override
-  public Repo<Master> call(final long tid, final Master master) throws Exception {
-    ExecutorService executor = getThreadPool(master);
-    final AccumuloConfiguration conf = master.getConfiguration();
-    VolumeManager fs = master.getFileSystem();
-    List<FileStatus> files = new ArrayList<FileStatus>();
-    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
-      files.add(entry);
-    }
-    log.debug("tid " + tid + " importing " + files.size() + " files");
-
-    Path writable = new Path(this.errorDir, ".iswritable");
-    if (!fs.createNewFile(writable)) {
-      // Maybe this is a re-try... clear the flag and try again
-      fs.delete(writable);
-      if (!fs.createNewFile(writable))
-        throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
-            "Unable to write to " + this.errorDir);
-    }
-    fs.delete(writable);
-
-    final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
-    for (FileStatus f : files)
-      filesToLoad.add(f.getPath().toString());
-
-    final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
-    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
-      List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
-
-      if (master.onlineTabletServers().size() == 0)
-        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
-
-      while (master.onlineTabletServers().size() == 0) {
-        UtilWaitThread.sleep(500);
-      }
-
-      // Use the threadpool to assign files one-at-a-time to the server
-      final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
-      for (final String file : filesToLoad) {
-        results.add(executor.submit(new Callable<List<String>>() {
-          @Override
-          public List<String> call() {
-            List<String> failures = new ArrayList<String>();
-            ClientService.Client client = null;
-            String server = null;
-            try {
-              // get a connection to a random tablet server, do not prefer cached connections because
-              // this is running on the master and there are lots of connections to tablet servers
-              // serving the metadata tablets
-              long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
-              Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis);
-              client = pair.getSecond();
-              server = pair.getFirst();
-              List<String> attempt = Collections.singletonList(file);
-              log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
-              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime);
-              if (fail.isEmpty()) {
-                loaded.add(file);
-              } else {
-                failures.addAll(fail);
-              }
-            } catch (Exception ex) {
-              log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
-            } finally {
-              ServerClient.close(client);
-            }
-            return failures;
-          }
-        }));
-      }
-      Set<String> failures = new HashSet<String>();
-      for (Future<List<String>> f : results)
-        failures.addAll(f.get());
-      filesToLoad.removeAll(loaded);
-      if (filesToLoad.size() > 0) {
-        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
-        UtilWaitThread.sleep(100);
-      }
-    }
-
-    FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
-    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8));
-    try {
-      for (String f : filesToLoad) {
-        out.write(f);
-        out.write("\n");
-      }
-    } finally {
-      out.close();
-    }
-
-    // return the next step, which will perform cleanup
-    return new CompleteBulkImport(tableId, source, bulk, errorDir);
-  }
-
-  static String sampleList(Collection<?> potentiallyLongList, int max) {
-    StringBuffer result = new StringBuffer();
-    result.append("[");
-    int i = 0;
-    for (Object obj : potentiallyLongList) {
-      result.append(obj);
-      if (i >= max) {
-        result.append("...");
-        break;
-      } else {
-        result.append(", ");
-      }
-      i++;
-    }
-    if (i < max)
-      result.delete(result.length() - 2, result.length());
-    result.append("]");
-    return result.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
index 4f4b27e..e268f17 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
@@ -27,29 +27,6 @@ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 
-class FinishCancelCompaction extends MasterRepo {
-  private static final long serialVersionUID = 1L;
-  private String tableId;
-
-  public FinishCancelCompaction(String tableId) {
-    this.tableId = tableId;
-  }
-
-  @Override
-  public Repo<Master> call(long tid, Master environment) throws Exception {
-    Utils.getReadLock(tableId, tid).unlock();
-    return null;
-  }
-
-  @Override
-  public void undo(long tid, Master environment) throws Exception {
-
-  }
-}
-
-/**
- *
- */
 public class CancelCompactions extends MasterRepo {
 
   private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java
new file mode 100644
index 0000000..3e1aa33
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+
+class ChooseDir extends MasterRepo {
+  private static final long serialVersionUID = 1L;
+
+  private TableInfo tableInfo;
+
+  ChooseDir(TableInfo ti) {
+    this.tableInfo = ti;
+  }
+
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return 0;
+  }
+
+  @Override
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    // Constants.DEFAULT_TABLET_LOCATION has a leading slash prepended to it so we don't need to add one here
+    tableInfo.dir = master.getFileSystem().choose(Optional.of(tableInfo.tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+        + tableInfo.tableId + Constants.DEFAULT_TABLET_LOCATION;
+    return new CreateDir(tableInfo);
+  }
+
+  @Override
+  public void undo(long tid, Master master) throws Exception {
+
+  }
+}
\ No newline at end of file


Mime
View raw message