accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1297790 - in /incubator/accumulo/branches/1.4: docs/examples/ src/core/src/main/java/org/apache/accumulo/core/client/ src/core/src/main/java/org/apache/accumulo/core/client/admin/ src/core/src/main/java/org/apache/accumulo/core/client/impl...
Date Tue, 06 Mar 2012 23:23:30 GMT
Author: kturner
Date: Tue Mar  6 23:23:29 2012
New Revision: 1297790

URL: http://svn.apache.org/viewvc?rev=1297790&view=rev
Log:
ACCUMULO-387 ACCUMULO-175
 * initial checkin of map reduce directly over files
 * stopped using AccumuloClassLoader for client side code that loads iterators.

Added:
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
    incubator/accumulo/branches/1.4/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
Modified:
    incubator/accumulo/branches/1.4/docs/examples/README.mapred
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
    incubator/accumulo/branches/1.4/test/system/continuous/continuous-env.sh.example
    incubator/accumulo/branches/1.4/test/system/continuous/run-verify.sh

Modified: incubator/accumulo/branches/1.4/docs/examples/README.mapred
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/docs/examples/README.mapred?rev=1297790&r1=1297789&r2=1297790&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/docs/examples/README.mapred (original)
+++ incubator/accumulo/branches/1.4/docs/examples/README.mapred Tue Mar  6 23:23:29 2012
@@ -89,3 +89,9 @@ counts.
     tserver, count:20080906 []    1
     tserver.compaction.major.concurrent.max count:20080906 []    1
     ...
+
+Another example to look at is
+org.apache.accumulo.examples.simple.mapreduce.UniqueColumns.  This example
+computes the unique set of columns in a table and shows how a map reduce job
+can directly read a tables files from HDFS. 
+

Modified: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java?rev=1297790&r1=1297789&r2=1297790&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
(original)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
Tue Mar  6 23:23:29 2012
@@ -188,7 +188,7 @@ public class ClientSideIteratorScanner e
         
         @Override
         public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {}
-      });
+      }, false);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

Modified: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1297790&r1=1297789&r2=1297790&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
(original)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
Tue Mar  6 23:23:29 2012
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.A
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
@@ -847,11 +848,18 @@ public class TableOperationsImpl extends
       return Collections.singleton(range);
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    TabletLocator tl = TabletLocator.getInstance(instance, credentials, new Text(Tables.getTableId(instance,
tableName)));
+    String tableId = Tables.getTableId(instance, tableName);
+    TabletLocator tl = TabletLocator.getInstance(instance, credentials, new Text(tableId));
     while (!tl.binRanges(Collections.singletonList(range), binnedRanges).isEmpty()) {
+      if (!Tables.exists(instance, tableId))
+        throw new TableDeletedException(tableId);
+      if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+        throw new TableOfflineException(instance, tableId);
+
       log.warn("Unable to locate bins for specified range. Retrying.");
       // sleep randomly between 100 and 200ms
       UtilWaitThread.sleep(100 + (int) (Math.random() * 100));
+      binnedRanges.clear();
     }
     
     // group key extents to get <= maxSplits

Added: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java?rev=1297790&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
(added)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
Tue Mar  6 23:23:29 2012
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+class OfflineIterator implements Iterator<Entry<Key,Value>> {
+  
+  static class OfflineIteratorEnvironment implements IteratorEnvironment {
+    @Override
+    public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName)
throws IOException {
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public AccumuloConfiguration getConfig() {
+      return AccumuloConfiguration.getDefaultConfiguration();
+    }
+    
+    @Override
+    public IteratorScope getIteratorScope() {
+      return IteratorScope.scan;
+    }
+    
+    @Override
+    public boolean isFullMajorCompaction() {
+      return false;
+    }
+    
+    private ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators =
new ArrayList<SortedKeyValueIterator<Key,Value>>();
+    
+    @Override
+    public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
+      topLevelIterators.add(iter);
+    }
+    
+    SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value>
iter) {
+      if (topLevelIterators.isEmpty())
+        return iter;
+      ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators);
+      allIters.add(iter);
+      return new MultiIterator(allIters, false);
+    }
+  }
+
+  private SortedKeyValueIterator<Key,Value> iter;
+  private Range range;
+  private KeyExtent currentExtent;
+  private Connector conn;
+  private String tableId;
+  private AuthInfo credentials;
+  private Authorizations authorizations;
+  private Instance instance;
+  private ScannerOptions options;
+  private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
+
+  /**
+   * @param offlineScanner
+   * @param instance
+   * @param credentials
+   * @param authorizations
+   * @param table
+   */
+  public OfflineIterator(ScannerOptions options, Instance instance, AuthInfo credentials,
Authorizations authorizations, Text table, Range range) {
+    this.options = new ScannerOptions(options);
+    this.instance = instance;
+    this.range = range;
+    
+    if (this.options.fetchedColumns.size() > 0) {
+      range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
+    }
+
+    this.tableId = table.toString();
+    this.authorizations = authorizations;
+    this.credentials = credentials;
+    this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
+    
+    try {
+      conn = instance.getConnector(credentials);
+      nextTablet();
+      
+      while (iter != null && !iter.hasTop())
+        nextTablet();
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return iter != null && iter.hasTop();
+  }
+  
+  @Override
+  public Entry<Key,Value> next() {
+    try {
+      byte[] v = iter.getTopValue().get();
+      // copy just like tablet server does, do this before calling next
+      KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length));
+
+      iter.next();
+      
+      while (iter != null && !iter.hasTop())
+        nextTablet();
+      
+      return ret;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  /**
+   * @throws TableNotFoundException
+   * @throws IOException
+   * @throws AccumuloException
+   * 
+   */
+  private void nextTablet() throws TableNotFoundException, AccumuloException, IOException
{
+    
+    Range nextRange = null;
+    
+    if (currentExtent == null) {
+      Text startRow;
+      
+      if (range.getStartKey() != null)
+        startRow = range.getStartKey().getRow();
+      else
+        startRow = new Text();
+      
+      nextRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(),
true, null, false);
+    } else {
+      
+      if (currentExtent.getEndRow() == null) {
+        iter = null;
+        return;
+      }
+
+      if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW)))
{
+        iter = null;
+        return;
+      }
+
+      nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false);
+    }
+
+    List<String> relFiles = new ArrayList<String>();
+    
+    Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
+
+    while (eloc.getSecond() != null) {
+      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+        Tables.clearCache(instance);
+        if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+          throw new AccumuloException("Table is online " + tableId + " cannot scan tablet
in offline mode " + eloc.getFirst());
+        }
+      }
+      
+      UtilWaitThread.sleep(250);
+      
+      eloc = getTabletFiles(nextRange, relFiles);
+    }
+    
+    KeyExtent extent = eloc.getFirst();
+    
+    if (!extent.getTableId().toString().equals(tableId)) {
+      throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent);
+    }
+
+    if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
+      throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
+    
+    String tablesDir = Constants.getTablesDir(AccumuloConfiguration.getSiteConfiguration());
+    List<String> absFiles = new ArrayList<String>();
+    for (String relPath : relFiles) {
+      if (relPath.startsWith(".."))
+        absFiles.add(tablesDir + relPath.substring(2));
+      else
+        absFiles.add(tablesDir + "/" + tableId + relPath);
+    }
+    
+    iter = createIterator(extent, absFiles);
+    iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns), options.fetchedColumns.size()
== 0 ? false : true);
+    currentExtent = extent;
+    
+  }
+  
+  private Pair<KeyExtent,String> getTabletFiles(Range nextRange, List<String>
relFiles) throws TableNotFoundException {
+    Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+    scanner.setBatchSize(100);
+    scanner.setRange(nextRange);
+    
+    RowIterator rowIter = new RowIterator(scanner);
+    Iterator<Entry<Key,Value>> row = rowIter.next();
+    
+    KeyExtent extent = null;
+    String location = null;
+    
+    while (row.hasNext()) {
+      Entry<Key,Value> entry = row.next();
+      Key key = entry.getKey();
+
+      if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
+        relFiles.add(key.getColumnQualifier().toString());
+      }
+      
+      if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
+          || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY))
{
+        location = entry.getValue().toString();
+      }
+      
+      if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
+        extent = new KeyExtent(key.getRow(), entry.getValue());
+      }
+
+    }
+    return new Pair<KeyExtent,String>(extent, location);
+  }
+
+  /**
+   * @param absFiles
+   * @return
+   * @throws AccumuloException
+   * @throws TableNotFoundException
+   * @throws IOException
+   */
+  private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<String>
absFiles) throws TableNotFoundException, AccumuloException,
+      IOException {
+    
+    // TODO share code w/ tablet
+    AccumuloConfiguration acuTableConf = AccumuloConfiguration.getTableConfiguration(conn,
tableId);
+    
+    Configuration conf = CachedConfiguration.getInstance();
+    
+    FileSystem fs = FileUtil.getFileSystem(conf, AccumuloConfiguration.getSiteConfiguration());
+    
+    for (SortedKeyValueIterator<Key,Value> reader : readers) {
+      ((FileSKVIterator) reader).close();
+    }
+    
+    readers.clear();
+
+    // TODO need to close files
+    for (String file : absFiles) {
+      FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf,
acuTableConf, null, null);
+      readers.add(reader);
+    }
+    
+    MultiIterator multiIter = new MultiIterator(readers, extent);
+    
+    OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment();
+    
+    DeletingIterator delIter = new DeletingIterator(multiIter, false);
+    
+    ColumnQualifierFilter colFilter = new ColumnQualifierFilter(delIter, new HashSet<Column>(options.fetchedColumns));
+    
+    byte[] defaultSecurityLabel;
+    
+    ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
+    defaultSecurityLabel = cv.getExpression();
+    
+    VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel);
+    
+    return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter,
extent, acuTableConf, options.serverSideIteratorList,
+        options.serverSideIteratorOptions, iterEnv, false));
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+  
+}
+
+/**
+ * 
+ */
+public class OfflineScanner extends ScannerOptions implements Scanner {
+  
+  private int batchSize;
+  private int timeOut;
+  private Range range;
+  
+  private Instance instance;
+  private AuthInfo credentials;
+  private Authorizations authorizations;
+  private Text tableId;
+  
+  public OfflineScanner(Instance instance, AuthInfo credentials, String tableId, Authorizations
authorizations) {
+    ArgumentChecker.notNull(instance, credentials, tableId, authorizations);
+    this.instance = instance;
+    this.credentials = credentials;
+    this.tableId = new Text(tableId);
+    this.range = new Range((Key) null, (Key) null);
+
+    this.authorizations = authorizations;
+    
+    this.batchSize = Constants.SCAN_BATCH_SIZE;
+    this.timeOut = Integer.MAX_VALUE;
+  }
+
+  @Override
+  public void setTimeOut(int timeOut) {
+    this.timeOut = timeOut;
+  }
+  
+  @Override
+  public int getTimeOut() {
+    return timeOut;
+  }
+  
+  @Override
+  public void setRange(Range range) {
+    this.range = range;
+  }
+  
+  @Override
+  public Range getRange() {
+    return range;
+  }
+  
+  @Override
+  public void setBatchSize(int size) {
+    this.batchSize = size;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return batchSize;
+  }
+  
+  @Override
+  public void enableIsolation() {
+    
+  }
+  
+  @Override
+  public void disableIsolation() {
+    
+  }
+  
+  @Override
+  public Iterator<Entry<Key,Value>> iterator() {
+    return new OfflineIterator(this, instance, credentials, authorizations, tableId, range);
+  }
+
+}

Modified: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1297790&r1=1297789&r2=1297790&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
(original)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
Tue Mar  6 23:23:29 2012
@@ -50,9 +50,13 @@ import org.apache.accumulo.core.client.C
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.OfflineScanner;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mock.MockInstance;
@@ -65,10 +69,12 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -139,6 +145,8 @@ public abstract class InputFormatBase<K,
   private static final String ITERATORS_OPTIONS = PREFIX + ".iterators.options";
   private static final String ITERATORS_DELIM = ",";
   
+  private static final String READ_OFFLINE = PREFIX + ".read.offline";
+
   /**
    * @deprecated Use {@link #setIsolated(Configuration,boolean)} instead
    */
@@ -373,6 +381,32 @@ public abstract class InputFormatBase<K,
   }
   
   /**
+   * Enable reading offline tables. This will make the map reduce job directly read the tables
files. If the table is not offline, then the job will fail. If
+   * the table comes online during the map reduce job, its likely that the job will fail.
+   * 
+   * To use this option, the map reduce user will need access to read the accumulo directory
in HDFS.
+   * 
+   * Reading the offline table will create the scan time iterator stack in the map process.
So any iterators that are configured for the table will need to be
+   * on the mappers classpath. The accumulo-site.xml may need to be on the mappers classpath
if HDFS or the accumlo directory in HDFS are non-standard.
+   * 
+   * One way to use this feature is to clone a table, take the clone offline, and use the
clone as the input table for a map reduce job. If you plan to map
+   * reduce over the data many times, it may be better to the compact the table, clone it,
take it offline, and use the clone for all map reduce jobs. The
+   * reason to do this is that compaction will reduce each tablet in the table to one file,
and its faster to read from one file.
+   * 
+   * There are two possible advantages to reading a tables file directly out of HDFS. First,
you may see better read performance. Second, it will support
+   * speculative execution better. When reading an online table speculative execution can
put more load on an already slow tablet server.
+   * 
+   * @param conf
+   *          the job
+   * @param scanOff
+   *          pass true to read offline tables
+   */
+  
+  public static void setScanOffline(Configuration conf, boolean scanOff) {
+    conf.setBoolean(READ_OFFLINE, scanOff);
+  }
+  
+  /**
    * @deprecated Use {@link #fetchColumns(Configuration,Collection)} instead
    */
   public static void fetchColumns(JobContext job, Collection<Pair<Text,Text>>
columnFamilyColumnQualifierPairs) {
@@ -893,6 +927,12 @@ public abstract class InputFormatBase<K,
     return conf.getInt(MAX_VERSIONS, -1);
   }
   
+  protected static boolean isOfflineScan(Configuration conf) {
+    return conf.getBoolean(READ_OFFLINE, false);
+  }
+
+  // Return a list of the iterator settings (for iterators to apply to a scanner)
+
   /**
    * @deprecated Use {@link #getIterators(Configuration)} instead
    */
@@ -1069,7 +1109,12 @@ public abstract class InputFormatBase<K,
         Connector conn = instance.getConnector(user, password);
         log.debug("Creating scanner for table: " + getTablename(attempt.getConfiguration()));
         log.debug("Authorizations are: " + authorizations);
-        scanner = conn.createScanner(getTablename(attempt.getConfiguration()), authorizations);
+        if (isOfflineScan(attempt.getConfiguration())) {
+          scanner = new OfflineScanner(instance, new AuthInfo(user, ByteBuffer.wrap(password),
instance.getInstanceID()), Tables.getTableId(instance,
+              getTablename(attempt.getConfiguration())), authorizations);
+        } else {
+          scanner = conn.createScanner(getTablename(attempt.getConfiguration()), authorizations);
+        }
         if (isIsolated(attempt.getConfiguration())) {
           log.info("Creating isolated scanner");
           scanner = new IsolatedScanner(scanner);
@@ -1128,6 +1173,106 @@ public abstract class InputFormatBase<K,
     }
   }
   
+  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext job,
String tableName, List<Range> ranges) throws TableNotFoundException,
+      AccumuloException, AccumuloSecurityException {
+    
+    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+
+    Instance instance = getInstance(job.getConfiguration());
+    Connector conn = instance.getConnector(getUsername(job.getConfiguration()), getPassword(job.getConfiguration()));
+    String tableId = Tables.getTableId(instance, tableName);
+    
+    if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+      Tables.clearCache(instance);
+      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+        throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot
scan table in offline mode ");
+      }
+    }
+
+    for (Range range : ranges) {
+      Text startRow;
+      
+      if (range.getStartKey() != null)
+        startRow = range.getStartKey().getRow();
+      else
+        startRow = new Text();
+      
+      Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(),
true, null, false);
+      Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+      ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN);
+      scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY);
+      scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+      scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
+      scanner.setRange(metadataRange);
+      
+      RowIterator rowIter = new RowIterator(scanner);
+      
+      // TODO check that extents match prev extent
+
+      KeyExtent lastExtent = null;
+
+      while (rowIter.hasNext()) {
+        Iterator<Entry<Key,Value>> row = rowIter.next();
+        String last = "";
+        KeyExtent extent = null;
+        String location = null;
+        
+        while (row.hasNext()) {
+          Entry<Key,Value> entry = row.next();
+          Key key = entry.getKey();
+          
+          if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY))
{
+            last = entry.getValue().toString();
+          }
+          
+          if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
+              || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY))
{
+            location = entry.getValue().toString();
+          }
+
+          if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
+            extent = new KeyExtent(key.getRow(), entry.getValue());
+          }
+          
+        }
+        
+        if (location != null)
+          return null;
+
+        if (!extent.getTableId().toString().equals(tableId)) {
+          throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
+        }
+
+        if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
+          throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
+        }
+
+        Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
+        if (tabletRanges == null) {
+          tabletRanges = new HashMap<KeyExtent,List<Range>>();
+          binnedRanges.put(last, tabletRanges);
+        }
+        
+        List<Range> rangeList = tabletRanges.get(extent);
+        if (rangeList == null) {
+          rangeList = new ArrayList<Range>();
+          tabletRanges.put(extent, rangeList);
+        }
+        
+        rangeList.add(range);
+
+        if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW)))
{
+          break;
+        }
+        
+        lastExtent = extent;
+      }
+
+    }
+    
+    return binnedRanges;
+  }
+
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
@@ -1148,10 +1293,30 @@ public abstract class InputFormatBase<K,
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     TabletLocator tl;
     try {
-      tl = getTabletLocator(job.getConfiguration());
-      while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
-        log.warn("Unable to locate bins for specified ranges. Retrying.");
-        UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between
100 and 200 ms
+      if (isOfflineScan(job.getConfiguration())) {
+        binnedRanges = binOfflineTable(job, tableName, ranges);
+        while (binnedRanges == null) {
+          // Some tablets were still online, try again
+          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between
100 and 200 ms
+          binnedRanges = binOfflineTable(job, tableName, ranges);
+        }
+      } else {
+        Instance instance = getInstance(job.getConfiguration());
+        String tableId = null;
+        tl = getTabletLocator(job.getConfiguration());
+        while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
+          if (!(instance instanceof MockInstance)) {
+            if (tableId == null)
+              tableId = Tables.getTableId(instance, tableName);
+            if (!Tables.exists(instance, tableId))
+              throw new TableDeletedException(tableId);
+            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+              throw new TableOfflineException(instance, tableId);
+          }
+          binnedRanges.clear();
+          log.warn("Unable to locate bins for specified ranges. Retrying.");
+          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between
100 and 200 ms
+        }
       }
     } catch (Exception e) {
       throw new IOException(e);

Modified: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java?rev=1297790&r1=1297789&r2=1297790&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
(original)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
Tue Mar  6 23:23:29 2012
@@ -33,8 +33,8 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
 import org.apache.accumulo.core.iterators.system.DeletingIterator;
@@ -106,7 +106,7 @@ public class MockScannerBase extends Sca
     AccumuloConfiguration conf = new MockConfiguration(table.settings);
     MockIteratorEnvironment iterEnv = new MockIteratorEnvironment();
     SortedKeyValueIterator<Key,Value> result = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan,
vf, null, conf,
-        serverSideIteratorList, serverSideIteratorOptions, iterEnv));
+        serverSideIteratorList, serverSideIteratorOptions, iterEnv, false));
     return result;
   }
   

Modified: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java?rev=1297790&r1=1297789&r2=1297790&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
(original)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
Tue Mar  6 23:23:29 2012
@@ -187,6 +187,12 @@ public class IteratorUtil {
   public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V>
loadIterators(IteratorScope scope,
       SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
       IteratorEnvironment env) throws IOException {
+    return loadIterators(scope, source, extent, conf, ssiList, ssio, env, true);
+  }
+  
+  public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V>
loadIterators(IteratorScope scope,
+      SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+      IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
     List<IterInfo> iters = new ArrayList<IterInfo>(ssiList);
     Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>();
     
@@ -203,18 +209,23 @@ public class IteratorUtil {
       }
     }
     
-    return loadIterators(source, iters, allOptions, env);
+    return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader);
   }
   
+  @SuppressWarnings("unchecked")
   public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V>
loadIterators(SortedKeyValueIterator<K,V> source,
-      Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts,
IteratorEnvironment env) throws IOException {
+      Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts,
IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
     SortedKeyValueIterator<K,V> prev = source;
     
     try {
       for (IterInfo iterInfo : iters) {
-        @SuppressWarnings("unchecked")
-        Class<? extends SortedKeyValueIterator<K,V>> clazz = (Class<? extends
SortedKeyValueIterator<K,V>>) AccumuloClassLoader.loadClass(iterInfo.className,
-            SortedKeyValueIterator.class);
+       
+        Class<? extends SortedKeyValueIterator<K,V>> clazz;
+        if (useAccumuloClassLoader){
+          clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloClassLoader.loadClass(iterInfo.className,
SortedKeyValueIterator.class);
+        }else{
+          clazz = (Class<? extends SortedKeyValueIterator<K,V>>) Class.forName(iterInfo.className).asSubclass(SortedKeyValueIterator.class);
+        }
         SortedKeyValueIterator<K,V> skvi = clazz.newInstance();
         
         Map<String,String> options = iterOpts.get(iterInfo.iterName);

Added: incubator/accumulo/branches/1.4/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java?rev=1297790&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
(added)
+++ incubator/accumulo/branches/1.4/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
Tue Mar  6 23:23:29 2012
@@ -0,0 +1,139 @@
+package org.apache.accumulo.examples.simple.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A simple map reduce job that computes the unique column families and column qualifiers
in a table.  This example shows one way to run against an offline table.
+ */
+public class UniqueColumns extends Configured implements Tool {
+  
+  private static final Text EMPTY = new Text();
+  
+  public static class UMapper extends Mapper<Key,Value,Text,Text> {    
+    private Text temp = new Text();
+    private static final Text CF = new Text("cf:");
+    private static final Text CQ = new Text("cq:");
+    
+    public void map(Key key, Value value, Context context) throws IOException, InterruptedException
{
+      temp.set(CF);
+      ByteSequence cf = key.getColumnFamilyData();
+      temp.append(cf.getBackingArray(), cf.offset(), cf.length());
+      context.write(temp, EMPTY);
+      
+      temp.set(CQ);
+      ByteSequence cq = key.getColumnQualifierData();
+      temp.append(cq.getBackingArray(), cq.offset(), cq.length());
+      context.write(temp, EMPTY);
+    }
+  }
+  
+  public static class UReducer extends Reducer<Text,Text,Text,Text> {
+    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
+      context.write(key, EMPTY);
+    }
+  }
+  
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    
+    if (args.length != 8) {
+      throw new IllegalArgumentException("Usage : " + UniqueColumns.class.getSimpleName()
+          + " <instance name> <zookeepers> <user> <password> <table>
<output directory> <num reducers> offline|online");
+    }
+
+    boolean scanOffline = args[7].equals("offline");
+    String table = args[4];
+    String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
+    
+    Job job = new Job(getConf(), jobName);
+    job.setJarByClass(this.getClass());
+
+    String clone = table;
+    Connector conn = null;
+    if (scanOffline) {
+      /*
+       * this example clones the table and takes it offline. If you plan to run map reduce
jobs over a table many times, it may be more efficient to compact the
+       * table, clone it, and then keep using the same clone as input for map reduce.
+       */
+      
+      ZooKeeperInstance zki = new ZooKeeperInstance(args[0], args[1]);
+      conn = zki.getConnector(args[2], args[3].getBytes());
+      clone = table + "_" + jobName;
+      conn.tableOperations().clone(table, clone, true, new HashMap<String,String>(),
new HashSet<String>());
+      conn.tableOperations().offline(clone);
+      
+      AccumuloInputFormat.setScanOffline(job.getConfiguration(), true);
+    }
+    
+
+    
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), args[0], args[1]);
+    AccumuloInputFormat.setInputInfo(job.getConfiguration(), args[2], args[3].getBytes(),
clone, new Authorizations());
+    
+    
+    job.setMapperClass(UMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    
+    job.setCombinerClass(UReducer.class);
+    job.setReducerClass(UReducer.class);
+
+    job.setNumReduceTasks(Integer.parseInt(args[6]));
+
+    job.setOutputFormatClass(TextOutputFormat.class);
+    TextOutputFormat.setOutputPath(job, new Path(args[5]));
+    
+    job.waitForCompletion(true);
+    
+    if (scanOffline) {
+      conn.tableOperations().delete(clone);
+    }
+
+    return job.isSuccessful() ? 0 : 1;
+  }
+  
+  
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(CachedConfiguration.getInstance(), new UniqueColumns(), args);
+    System.exit(res);
+  }
+}

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java?rev=1297790&r1=1297789&r2=1297790&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
Tue Mar  6 23:23:29 2012
@@ -18,8 +18,12 @@ package org.apache.accumulo.server.test.
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.data.Key;
@@ -130,10 +134,10 @@ public class ContinuousVerify extends Co
   }
   
   @Override
-  public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException
{
-    if (args.length != 8) {
+  public int run(String[] args) throws Exception {
+    if (args.length != 9) {
       throw new IllegalArgumentException("Usage : " + ContinuousVerify.class.getName()
-          + " <instance name> <zookeepers> <user> <pass> <table>
<output dir> <max mappers> <num reducers>");
+          + " <instance name> <zookeepers> <user> <pass> <table>
<output dir> <max mappers> <num reducers> <scan offline>");
     }
     
     String instance = args[0];
@@ -144,14 +148,26 @@ public class ContinuousVerify extends Co
     String outputdir = args[5];
     String maxMaps = args[6];
     String reducers = args[7];
+    boolean scanOffline = Boolean.parseBoolean(args[8]);
     
     Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
     job.setJarByClass(this.getClass());
     
+    String clone = table;
+    Connector conn = null;
+    if (scanOffline) {
+      Random random = new Random();
+      clone = table + "_" + String.format("%016x", Math.abs(random.nextLong()));
+      ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
+      conn = zki.getConnector(user, pass.getBytes());
+      conn.tableOperations().clone(table, clone, true, new HashMap<String,String>(),
new HashSet<String>());
+      conn.tableOperations().offline(clone);
+    }
+
     job.setInputFormatClass(AccumuloInputFormat.class);
-    AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), table,
new Authorizations());
+    AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), clone,
new Authorizations());
     AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
-    
+    AccumuloInputFormat.setScanOffline(job.getConfiguration(), scanOffline);
     // set up ranges
     try {
       Set<Range> ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user,
pass.getBytes()).tableOperations()
@@ -170,9 +186,17 @@ public class ContinuousVerify extends Co
     job.setNumReduceTasks(Integer.parseInt(reducers));
     
     job.setOutputFormatClass(TextOutputFormat.class);
+    
+    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", scanOffline);
+
     TextOutputFormat.setOutputPath(job, new Path(outputdir));
     
     job.waitForCompletion(true);
+    
+    if (scanOffline) {
+      conn.tableOperations().delete(clone);
+    }
+
     return job.isSuccessful() ? 0 : 1;
   }
   

Modified: incubator/accumulo/branches/1.4/test/system/continuous/continuous-env.sh.example
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/test/system/continuous/continuous-env.sh.example?rev=1297790&r1=1297789&r2=1297790&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/test/system/continuous/continuous-env.sh.example (original)
+++ incubator/accumulo/branches/1.4/test/system/continuous/continuous-env.sh.example Tue Mar
 6 23:23:29 2012
@@ -77,6 +77,7 @@ MASTER_RESTART_SLEEP_TIME=2
 VERFIY_OUT=/tmp/continuous_verify
 VERIFY_MAX_MAPS=64
 VERIFY_REDUCERS=64
+SCAN_OFFLINE=false
 
 #settings related to the batch walker
 BATCH_WALKER_SLEEP=180000

Modified: incubator/accumulo/branches/1.4/test/system/continuous/run-verify.sh
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/test/system/continuous/run-verify.sh?rev=1297790&r1=1297789&r2=1297790&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/test/system/continuous/run-verify.sh (original)
+++ incubator/accumulo/branches/1.4/test/system/continuous/run-verify.sh Tue Mar  6 23:23:29
2012
@@ -18,5 +18,5 @@
 
 . mapred-setup.sh
 
-$ACCUMULO_HOME/bin/tool.sh "$SERVER_LIBJAR" org.apache.accumulo.server.test.continuous.ContinuousVerify
-libjars "$SERVER_LIBJAR" $INSTANCE_NAME $ZOO_KEEPERS $USER $PASS $TABLE $VERFIY_OUT $VERIFY_MAX_MAPS
$VERIFY_REDUCERS
+$ACCUMULO_HOME/bin/tool.sh "$SERVER_LIBJAR" org.apache.accumulo.server.test.continuous.ContinuousVerify
-libjars "$SERVER_LIBJAR" $INSTANCE_NAME $ZOO_KEEPERS $USER $PASS $TABLE $VERFIY_OUT $VERIFY_MAX_MAPS
$VERIFY_REDUCERS $SCAN_OFFLINE
 



Mime
View raw message