accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [12/35] ACCUMULO-2041 extract tablet classes to new files, move tablet-related code to o.a.a.tserver.tablet, make member variables private
Date Thu, 05 Jun 2014 04:42:54 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java
new file mode 100644
index 0000000..9278cb2
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.tserver.MinorCompactionReason;
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+
+class MinorCompactionTask implements Runnable {
+
+  private final Tablet tablet;
+  private long queued;
+  private CommitSession commitSession;
+  private DataFileValue stats;
+  private FileRef mergeFile;
+  private long flushId;
+  private MinorCompactionReason mincReason;
+
+  MinorCompactionTask(Tablet tablet, FileRef mergeFile, CommitSession commitSession, long
flushId, MinorCompactionReason mincReason) {
+    this.tablet = tablet;
+    queued = System.currentTimeMillis();
+    tablet.minorCompactionWaitingToStart();
+    this.commitSession = commitSession;
+    this.mergeFile = mergeFile;
+    this.flushId = flushId;
+    this.mincReason = mincReason;
+  }
+
+  @Override
+  public void run() {
+    tablet.isMinorCompactionRunning();
+    Span minorCompaction = Trace.on("minorCompaction");
+    try {
+      FileRef newMapfileLocation = tablet.getNextMapFilename(mergeFile == null ? "F" : "M");
+      FileRef tmpFileRef = new FileRef(newMapfileLocation.path() + "_tmp");
+      Span span = Trace.start("waitForCommits");
+      synchronized (tablet) {
+        commitSession.waitForCommitsToFinish();
+      }
+      span.stop();
+      span = Trace.start("start");
+      while (true) {
+        try {
+          // the purpose of the minor compaction start event is to keep track of the filename...
in the case
+          // where the metadata table write for the minor compaction finishes and the process
dies before
+          // writing the minor compaction finish event, then the start event+filename in
metadata table will
+          // prevent recovery of duplicate data... the minor compaction start event could
be written at any time
+          // before the metadata write for the minor compaction
+          tablet.getTabletServer().minorCompactionStarted(commitSession, commitSession.getWALogSeq()
+ 1, newMapfileLocation.path().toString());
+          break;
+        } catch (IOException e) {
+          Tablet.log.warn("Failed to write to write ahead log " + e.getMessage(), e);
+        }
+      }
+      span.stop();
+      span = Trace.start("compact");
+      this.stats = tablet.minorCompact(tablet.getTabletServer().getFileSystem(), tablet.getTabletMemory().getMinCMemTable(),
tmpFileRef, newMapfileLocation, mergeFile, true, queued, commitSession, flushId,
+          mincReason);
+      span.stop();
+
+      if (tablet.needsSplit()) {
+        tablet.getTabletServer().executeSplit(tablet);
+      } else {
+        tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL);
+      }
+    } catch (Throwable t) {
+      Tablet.log.error("Unknown error during minor compaction for extent: " + tablet.getExtent(),
t);
+      throw new RuntimeException(t);
+    } finally {
+      tablet.minorCompactionComplete();
+      minorCompaction.data("extent", tablet.getExtent().toString());
+      minorCompaction.data("numEntries", Long.toString(this.stats.getNumEntries()));
+      minorCompaction.data("size", Long.toString(this.stats.getSize()));
+      minorCompaction.stop();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
new file mode 100644
index 0000000..6636159
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.MinorCompactionReason;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class MinorCompactor extends Compactor {
+  
+  private static final Logger log = Logger.getLogger(MinorCompactor.class);
+  
+  private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap();
+  
+  private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue
dfv) {
+    if (mergeFile == null)
+      return EMPTY_MAP;
+    
+    return Collections.singletonMap(mergeFile, dfv);
+  }
+  
+  public MinorCompactor(VolumeManager fs, InMemoryMap imm, FileRef mergeFile, DataFileValue
dfv, FileRef outputFile, TableConfiguration acuTableConf,
+      KeyExtent extent, MinorCompactionReason mincReason) {
+    super(fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new
CompactionEnv() {
+      
+      @Override
+      public boolean isCompactionEnabled() {
+        return true;
+      }
+      
+      @Override
+      public IteratorScope getIteratorScope() {
+        return IteratorScope.minc;
+      }
+    }, Collections.<IteratorSetting>emptyList(), mincReason.ordinal());
+  }
+  
+  private boolean isTableDeleting() {
+    try {
+      return Tables.getTableState(HdfsZooInstance.getInstance(), extent.getTableId().toString())
== TableState.DELETING;
+    } catch (Exception e) {
+      log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ",
e);
+      return false; // can not get positive confirmation that its deleting.
+    }
+  }
+  
+  @Override
+  public CompactionStats call() {
+    log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent());
+    
+    // output to new MapFile with a temporary name
+    int sleepTime = 100;
+    double growthFactor = 4;
+    int maxSleepTime = 1000 * 60 * 3; // 3 minutes
+    boolean reportedProblem = false;
+    
+    runningCompactions.add(this);
+    try {
+      do {
+        try {
+          CompactionStats ret = super.call();
+          
+          // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f
secs | %,d bytes ",map.size(), entriesCompacted,
+          // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes()));
+          
+          if (reportedProblem) {
+            ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(),
ProblemType.FILE_WRITE, getOutputFile());
+          }
+          
+          return ret;
+        } catch (IOException e) {
+          log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() +
" retrying ...");
+          ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(),
ProblemType.FILE_WRITE, getOutputFile(), e));
+          reportedProblem = true;
+        } catch (RuntimeException e) {
+          // if this is coming from a user iterator, it is possible that the user could change
the iterator config and that the
+          // minor compaction would succeed
+          log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() +
" retrying ...", e);
+          ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(),
ProblemType.FILE_WRITE, getOutputFile(), e));
+          reportedProblem = true;
+        } catch (CompactionCanceledException e) {
+          throw new IllegalStateException(e);
+        }
+        
+        Random random = new Random();
+        
+        int sleep = sleepTime + random.nextInt(sleepTime);
+        log.debug("MinC failed sleeping " + sleep + " ms before retrying");
+        UtilWaitThread.sleep(sleep);
+        sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor));
+        
+        // clean up
+        try {
+          if (getFileSystem().exists(new Path(getOutputFile()))) {
+            getFileSystem().deleteRecursively(new Path(getOutputFile()));
+          }
+        } catch (IOException e) {
+          log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());
+        }
+        
+        if (isTableDeleting())
+          return new CompactionStats(0, 0);
+        
+      } while (true);
+    } finally {
+      thread = null;
+      runningCompactions.remove(this);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java
new file mode 100644
index 0000000..450fffe
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+public class Rate {
+  private long lastCounter = -1;
+  private long lastTime = -1;
+  private double current = 0.0;
+  final double ratio;
+  
+  /**
+   * Turn a counter into an exponentially smoothed rate over time.
+   * 
+   * @param ratio
+   *          the rate at which each update influences the curve; must be (0., 1.0)
+   */
+  public Rate(double ratio) {
+    if (ratio <= 0. || ratio >= 1.0)
+      throw new IllegalArgumentException("ratio must be > 0. and < 1.0");
+    this.ratio = ratio;
+  }
+  
+  public double update(long counter) {
+    return update(System.currentTimeMillis(), counter);
+  }
+  
+  synchronized public double update(long when, long counter) {
+    if (lastCounter < 0) {
+      lastTime = when;
+      lastCounter = counter;
+      return current;
+    }
+    if (lastTime == when) {
+      throw new IllegalArgumentException("update time < last value");
+    }
+    double keep = 1. - ratio;
+    current = (keep * current + ratio * ((counter - lastCounter)) * 1000. / (when - lastTime));
+    lastTime = when;
+    lastCounter = counter;
+    return current;
+  }
+  
+  synchronized public double rate() {
+    return this.current;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java
new file mode 100644
index 0000000..3a8bb08
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * 
+ */
+public class RootFiles {
+
+  private static Logger log = Logger.getLogger(RootFiles.class);
+
+  public static void prepareReplacement(VolumeManager fs, Path location, Set<FileRef>
oldDatafiles, String compactName) throws IOException {
+    for (FileRef ref : oldDatafiles) {
+      Path path = ref.path();
+      DatafileManager.rename(fs, path, new Path(location + "/delete+" + compactName + "+"
+ path.getName()));
+    }
+  }
+
+  public static void renameReplacement(VolumeManager fs, FileRef tmpDatafile, FileRef newDatafile)
throws IOException {
+    if (fs.exists(newDatafile.path())) {
+      log.error("Target map file already exist " + newDatafile, new Exception());
+      throw new IllegalStateException("Target map file already exist " + newDatafile);
+    }
+
+    DatafileManager.rename(fs, tmpDatafile.path(), newDatafile.path());
+  }
+
+  public static void finishReplacement(AccumuloConfiguration acuTableConf, VolumeManager
fs, Path location, Set<FileRef> oldDatafiles, String compactName)
+      throws IOException {
+    // start deleting files, if we do not finish they will be cleaned
+    // up later
+    for (FileRef ref : oldDatafiles) {
+      Path path = ref.path();
+      Path deleteFile = new Path(location + "/delete+" + compactName + "+" + path.getName());
+      if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) || !fs.moveToTrash(deleteFile))
+        fs.deleteRecursively(deleteFile);
+    }
+  }
+
+  public static void replaceFiles(AccumuloConfiguration acuTableConf, VolumeManager fs, Path
location, Set<FileRef> oldDatafiles, FileRef tmpDatafile,
+      FileRef newDatafile) throws IOException {
+    String compactName = newDatafile.path().getName();
+
+    prepareReplacement(fs, location, oldDatafiles, compactName);
+    renameReplacement(fs, tmpDatafile, newDatafile);
+    finishReplacement(acuTableConf, fs, location, oldDatafiles, compactName);
+  }
+
+  public static Collection<String> cleanupReplacement(VolumeManager fs, FileStatus[]
files, boolean deleteTmp) throws IOException {
+    /*
+     * called in constructor and before major compactions
+     */
+    Collection<String> goodFiles = new ArrayList<String>(files.length);
+
+    for (FileStatus file : files) {
+
+      String path = file.getPath().toString();
+      if (file.getPath().toUri().getScheme() == null) {
+        // depending on the behavior of HDFS, if list status does not return fully qualified
volumes then could switch to the default volume
+        throw new IllegalArgumentException("Require fully qualified paths " + file.getPath());
+      }
+
+      String filename = file.getPath().getName();
+
+      // check for incomplete major compaction, this should only occur
+      // for root tablet
+      if (filename.startsWith("delete+")) {
+        String expectedCompactedFile = path.substring(0, path.lastIndexOf("/delete+")) +
"/" + filename.split("\\+")[1];
+        if (fs.exists(new Path(expectedCompactedFile))) {
+          // compaction finished, but did not finish deleting compacted files.. so delete
it
+          if (!fs.deleteRecursively(file.getPath()))
+            log.warn("Delete of file: " + file.getPath().toString() + " return false");
+          continue;
+        }
+        // compaction did not finish, so put files back
+
+        // reset path and filename for rest of loop
+        filename = filename.split("\\+", 3)[2];
+        path = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename;
+
+        DatafileManager.rename(fs, file.getPath(), new Path(path));
+      }
+
+      if (filename.endsWith("_tmp")) {
+        if (deleteTmp) {
+          log.warn("cleaning up old tmp file: " + path);
+          if (!fs.deleteRecursively(file.getPath()))
+            log.warn("Delete of tmp file: " + file.getPath().toString() + " return false");
+
+        }
+        continue;
+      }
+
+      if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1]))
{
+        log.error("unknown file in tablet" + path);
+        continue;
+      }
+
+      goodFiles.add(path);
+    }
+
+    return goodFiles;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java
new file mode 100644
index 0000000..0ea76d3
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.List;
+
+public class ScanBatch {
+  public final boolean more;
+  public final List<KVEntry> results;
+
+  ScanBatch(List<KVEntry> results, boolean more) {
+    this.results = results;
+    this.more = more;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
new file mode 100644
index 0000000..980a082
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.iterators.system.StatsIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.tserver.FileManager.ScanFileManager;
+import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
+import org.apache.accumulo.tserver.TabletIteratorEnvironment;
+import org.apache.accumulo.tserver.TabletServer;
+
+class ScanDataSource implements DataSource {
+
+  // data source state
+  private final Tablet tablet;
+  private ScanFileManager fileManager;
+  private SortedKeyValueIterator<Key,Value> iter;
+  private long expectedDeletionCount;
+  private List<MemoryIterator> memIters = null;
+  private long fileReservationId;
+  private AtomicBoolean interruptFlag;
+  private StatsIterator statsIterator;
+
+  private final ScanOptions options;
+
+  ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, HashSet<Column>
columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+      AtomicBoolean interruptFlag) {
+    this.tablet = tablet;
+    expectedDeletionCount = tablet.getDataSourceDeletions();
+    this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList,
ssio, interruptFlag, false);
+    this.interruptFlag = interruptFlag;
+  }
+
+  ScanDataSource(Tablet tablet, ScanOptions options) {
+    this.tablet = tablet;
+    expectedDeletionCount = tablet.getDataSourceDeletions();
+    this.options = options;
+    this.interruptFlag = options.interruptFlag;
+  }
+
+  @Override
+  public DataSource getNewDataSource() {
+    if (!isCurrent()) {
+      // log.debug("Switching data sources during a scan");
+      if (memIters != null) {
+        tablet.getTabletMemory().returnIterators(memIters);
+        memIters = null;
+        tablet.getDatafileManager().returnFilesForScan(fileReservationId);
+        fileReservationId = -1;
+      }
+
+      if (fileManager != null)
+        fileManager.releaseOpenFiles(false);
+
+      expectedDeletionCount = tablet.getDataSourceDeletions();
+      iter = null;
+
+      return this;
+    } else
+      return this;
+  }
+
+  @Override
+  public boolean isCurrent() {
+    return expectedDeletionCount == tablet.getDataSourceDeletions();
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
+    if (iter == null)
+      iter = createIterator();
+    return iter;
+  }
+
+  private SortedKeyValueIterator<Key,Value> createIterator() throws IOException {
+
+    Map<FileRef,DataFileValue> files;
+
+    synchronized (tablet) {
+
+      if (memIters != null)
+        throw new IllegalStateException("Tried to create new scan iterator w/o releasing
memory");
+
+      if (tablet.isClosed())
+        throw new TabletClosedException();
+
+      if (interruptFlag.get())
+        throw new IterationInterruptedException(tablet.getExtent().toString() + " " + interruptFlag.hashCode());
+
+      // only acquire the file manager when we know the tablet is open
+      if (fileManager == null) {
+        fileManager = tablet.getTabletResources().newScanFileManager();
+        tablet.addActiveScans(this);
+      }
+
+      if (fileManager.getNumOpenFiles() != 0)
+        throw new IllegalStateException("Tried to create new scan iterator w/o releasing
files");
+
+      // set this before trying to get iterators in case
+      // getIterators() throws an exception
+      expectedDeletionCount = tablet.getDataSourceDeletions();
+
+      memIters = tablet.getTabletMemory().getIterators();
+      Pair<Long,Map<FileRef,DataFileValue>> reservation = tablet.getDatafileManager().reserveFilesForScan();
+      fileReservationId = reservation.getFirst();
+      files = reservation.getSecond();
+    }
+
+    Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isolated);
+
+    List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size()
+ memIters.size());
+
+    iters.addAll(mapfiles);
+    iters.addAll(memIters);
+
+    for (SortedKeyValueIterator<Key,Value> skvi : iters)
+      ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
+
+    MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent());
+
+    TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan,
tablet.getTableConfiguration(), fileManager, files);
+
+    statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter());
+
+    DeletingIterator delIter = new DeletingIterator(statsIterator, false);
+
+    ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+
+    ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.columnSet);
+
+    VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations,
options.defaultLabels);
+
+    return iterEnv.getTopLevelIterator(IteratorUtil
+        .loadIterators(IteratorScope.scan, visFilter, tablet.getExtent(), tablet.getTableConfiguration(),
options.ssiList, options.ssio, iterEnv));
+  }
+
+  void close(boolean sawErrors) {
+
+    if (memIters != null) {
+      tablet.getTabletMemory().returnIterators(memIters);
+      memIters = null;
+      tablet.getDatafileManager().returnFilesForScan(fileReservationId);
+      fileReservationId = -1;
+    }
+
+    synchronized (tablet) {
+      if (tablet.removeScan(this) == 0)
+        tablet.notifyAll();
+    }
+
+    if (fileManager != null) {
+      fileManager.releaseOpenFiles(sawErrors);
+      fileManager = null;
+    }
+
+    if (statsIterator != null) {
+      statsIterator.report();
+    }
+
+  }
+
+  public void interrupt() {
+    interruptFlag.set(true);
+  }
+
+  @Override
+  public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void reattachFileManager() throws IOException {
+    if (fileManager != null)
+      fileManager.reattach();
+  }
+  
+  public void detachFileManager() {
+    if (fileManager != null)
+      fileManager.detach();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
new file mode 100644
index 0000000..9382ea7
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.security.Authorizations;
+
+class ScanOptions {
+
+  final Authorizations authorizations;
+  final byte[] defaultLabels;
+  final Set<Column> columnSet;
+  final List<IterInfo> ssiList;
+  final Map<String,Map<String,String>> ssio;
+  final AtomicBoolean interruptFlag;
+  final int num;
+  final boolean isolated;
+
+  ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column>
columnSet, List<IterInfo> ssiList,
+      Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean
isolated) {
+    this.num = num;
+    this.authorizations = authorizations;
+    this.defaultLabels = defaultLabels;
+    this.columnSet = columnSet;
+    this.ssiList = ssiList;
+    this.ssio = ssio;
+    this.interruptFlag = interruptFlag;
+    this.isolated = isolated;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
new file mode 100644
index 0000000..96379fc
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.apache.log4j.Logger;
+
+public class Scanner {
+  private static final Logger log = Logger.getLogger(Scanner.class);
+
+  private final Tablet tablet;
+  private final ScanOptions options;
+  private Range range;
+  private SortedKeyValueIterator<Key,Value> isolatedIter;
+  private ScanDataSource isolatedDataSource;
+  private boolean sawException = false;
+  private boolean scanClosed = false;
+
+  Scanner(Tablet tablet, Range range, ScanOptions options) {
+    this.tablet = tablet;
+    this.range = range;
+    this.options = options;
+  }
+
+  public synchronized ScanBatch read() throws IOException, TabletClosedException {
+
+    if (sawException)
+      throw new IllegalStateException("Tried to use scanner after exception occurred.");
+
+    if (scanClosed)
+      throw new IllegalStateException("Tried to use scanner after it was closed.");
+
+    Batch results = null;
+
+    ScanDataSource dataSource;
+
+    if (options.isolated) {
+      if (isolatedDataSource == null)
+        isolatedDataSource = new ScanDataSource(tablet, options);
+      dataSource = isolatedDataSource;
+    } else {
+      dataSource = new ScanDataSource(tablet, options);
+    }
+
+    try {
+
+      SortedKeyValueIterator<Key,Value> iter;
+
+      if (options.isolated) {
+        if (isolatedIter == null)
+          isolatedIter = new SourceSwitchingIterator(dataSource, true);
+        else
+          isolatedDataSource.reattachFileManager();
+        iter = isolatedIter;
+      } else {
+        iter = new SourceSwitchingIterator(dataSource, false);
+      }
+
+      results = tablet.nextBatch(iter, range, options.num, options.columnSet);
+
+      if (results.results == null) {
+        range = null;
+        return new ScanBatch(new ArrayList<KVEntry>(), false);
+      } else if (results.continueKey == null) {
+        return new ScanBatch(results.results, false);
+      } else {
+        range = new Range(results.continueKey, !results.skipContinueKey, range.getEndKey(),
range.isEndKeyInclusive());
+        return new ScanBatch(results.results, true);
+      }
+
+    } catch (IterationInterruptedException iie) {
+      sawException = true;
+      if (tablet.isClosed())
+        throw new TabletClosedException(iie);
+      else
+        throw iie;
+    } catch (IOException ioe) {
+      if (tablet.shutdownInProgress()) {
+        log.debug("IOException while shutdown in progress ", ioe);
+        throw new TabletClosedException(ioe); // assume IOException was caused by execution
of HDFS shutdown hook
+      }
+
+      sawException = true;
+      dataSource.close(true);
+      throw ioe;
+    } catch (RuntimeException re) {
+      sawException = true;
+      throw re;
+    } finally {
+      // code in finally block because always want
+      // to return mapfiles, even when exception is thrown
+      if (!options.isolated)
+        dataSource.close(false);
+      else 
+        dataSource.detachFileManager();
+      
+      if (results != null && results.results != null)
+        tablet.updateQueryStats(results.results.size(), results.numBytes);
+    }
+  }
+
+  // close and read are synchronized because can not call close on the data source while
it is in use
+  // this could lead to the case where file iterators that are in use by a thread are returned
+  // to the pool... this would be bad
+  public void close() {
+    options.interruptFlag.set(true);
+    synchronized (this) {
+      scanClosed = true;
+      if (isolatedDataSource != null)
+        isolatedDataSource.close(false);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
new file mode 100644
index 0000000..084503a
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.master.state.TServerInstance;
+
+/**
+ * operations are disallowed while we split which is ok since splitting is fast
+ * 
+ * a minor compaction should have taken place before calling this so there should be relatively
little left to compact
+ * 
+ * we just need to make sure major compactions aren't occurring if we have the major compactor
thread decide who needs splitting we can avoid synchronization
+ * issues with major compactions
+ * 
+ */
+
+public class SplitInfo {
+  final String dir;
+  final SortedMap<FileRef,DataFileValue> datafiles;
+  final String time;
+  final long initFlushID;
+  final long initCompactID;
+  final TServerInstance lastLocation;
+
+  SplitInfo(String d, SortedMap<FileRef,DataFileValue> dfv, String time, long initFlushID,
long initCompactID, TServerInstance lastLocation) {
+    this.dir = d;
+    this.datafiles = dfv;
+    this.time = time;
+    this.initFlushID = initFlushID;
+    this.initCompactID = initCompactID;
+    this.lastLocation = lastLocation;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java
new file mode 100644
index 0000000..75cf91e
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import org.apache.hadoop.io.Text;
+
+class SplitRowSpec {
+  final double splitRatio;
+  final Text row;
+
+  SplitRowSpec(double splitRatio, Text row) {
+    this.splitRatio = splitRatio;
+    this.row = row;
+  }
+}
\ No newline at end of file


Mime
View raw message