accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1391754 [2/11] - in /accumulo/branches/ACCUMULO-259: ./ assemble/ conf/ conf/examples/1GB/native-standalone/ conf/examples/1GB/standalone/ conf/examples/2GB/native-standalone/ conf/examples/2GB/standalone/ conf/examples/3GB/native-standalo...
Date Sat, 29 Sep 2012 05:43:13 GMT
Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/bloomfilter/DynamicBloomFilter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/bloomfilter/DynamicBloomFilter.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/bloomfilter/DynamicBloomFilter.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/bloomfilter/DynamicBloomFilter.java Sat Sep 29 05:42:59 2012
@@ -113,7 +113,7 @@ public class DynamicBloomFilter extends 
    * @param nr
    *          The threshold for the maximum number of keys to record in a dynamic Bloom filter row.
    */
-  public DynamicBloomFilter(int vectorSize, int nbHash, int hashType, int nr) {
+  public DynamicBloomFilter(final int vectorSize, final int nbHash, final int hashType, final int nr) {
     super(vectorSize, nbHash, hashType);
     
     this.nr = nr;
@@ -124,7 +124,7 @@ public class DynamicBloomFilter extends 
   }
   
   @Override
-  public boolean add(Key key) {
+  public boolean add(final Key key) {
     if (key == null) {
       throw new NullPointerException("Key can not be null");
     }
@@ -146,7 +146,7 @@ public class DynamicBloomFilter extends 
   }
   
   @Override
-  public void and(Filter filter) {
+  public void and(final Filter filter) {
     if (filter == null || !(filter instanceof DynamicBloomFilter) || filter.vectorSize != this.vectorSize || filter.nbHash != this.nbHash) {
       throw new IllegalArgumentException("filters cannot be and-ed");
     }
@@ -163,7 +163,7 @@ public class DynamicBloomFilter extends 
   }
   
   @Override
-  public boolean membershipTest(Key key) {
+  public boolean membershipTest(final Key key) {
     if (key == null) {
       return true;
     }
@@ -185,7 +185,7 @@ public class DynamicBloomFilter extends 
   }
   
   @Override
-  public void or(Filter filter) {
+  public void or(final Filter filter) {
     if (filter == null || !(filter instanceof DynamicBloomFilter) || filter.vectorSize != this.vectorSize || filter.nbHash != this.nbHash) {
       throw new IllegalArgumentException("filters cannot be or-ed");
     }
@@ -201,7 +201,7 @@ public class DynamicBloomFilter extends 
   }
   
   @Override
-  public void xor(Filter filter) {
+  public void xor(final Filter filter) {
     if (filter == null || !(filter instanceof DynamicBloomFilter) || filter.vectorSize != this.vectorSize || filter.nbHash != this.nbHash) {
       throw new IllegalArgumentException("filters cannot be xor-ed");
     }
@@ -230,7 +230,7 @@ public class DynamicBloomFilter extends 
   // Writable
   
   @Override
-  public void write(DataOutput out) throws IOException {
+  public void write(final DataOutput out) throws IOException {
     super.write(out);
     out.writeInt(nr);
     out.writeInt(currentNbRecord);
@@ -241,7 +241,7 @@ public class DynamicBloomFilter extends 
   }
   
   @Override
-  public void readFields(DataInput in) throws IOException {
+  public void readFields(final DataInput in) throws IOException {
     
     super.readFields(in);
     

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/bloomfilter/Filter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/bloomfilter/Filter.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/bloomfilter/Filter.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/bloomfilter/Filter.java Sat Sep 29 05:42:59 2012
@@ -101,7 +101,7 @@ public abstract class Filter implements 
    * @param hashType
    *          type of the hashing function (see {@link Hash}).
    */
-  protected Filter(int vectorSize, int nbHash, int hashType) {
+  protected Filter(final int vectorSize, final int nbHash, final int hashType) {
     this.vectorSize = vectorSize;
     this.nbHash = nbHash;
     this.hashType = hashType;
@@ -169,7 +169,7 @@ public abstract class Filter implements 
    * @param keys
    *          The list of keys.
    */
-  public void add(List<Key> keys) {
+  public void add(final List<Key> keys) {
     if (keys == null) {
       throw new IllegalArgumentException("ArrayList<Key> may not be null");
     }
@@ -185,7 +185,7 @@ public abstract class Filter implements 
    * @param keys
    *          The collection of keys.
    */
-  public void add(Collection<Key> keys) {
+  public void add(final Collection<Key> keys) {
     if (keys == null) {
       throw new IllegalArgumentException("Collection<Key> may not be null");
     }
@@ -211,7 +211,7 @@ public abstract class Filter implements 
   
   // Writable interface
   
-  public void write(DataOutput out) throws IOException {
+  public void write(final DataOutput out) throws IOException {
     out.writeInt(VERSION);
     out.writeInt(this.nbHash);
     out.writeByte(this.hashType);
@@ -226,8 +226,8 @@ public abstract class Filter implements 
     return VERSION;
   }
   
-  public void readFields(DataInput in) throws IOException {
-    int ver = in.readInt();
+  public void readFields(final DataInput in) throws IOException {
+    final int ver = in.readInt();
     rVersion = ver;
     if (ver > 0) { // old unversioned format
       this.nbHash = ver;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/AccumuloException.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/AccumuloException.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/AccumuloException.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/AccumuloException.java Sat Sep 29 05:42:59 2012
@@ -28,7 +28,7 @@ public class AccumuloException extends E
    * @param why
    *          is the reason for the error being thrown
    */
-  public AccumuloException(String why) {
+  public AccumuloException(final String why) {
     super(why);
   }
   
@@ -36,7 +36,7 @@ public class AccumuloException extends E
    * @param cause
    *          is the exception that this exception wraps
    */
-  public AccumuloException(Throwable cause) {
+  public AccumuloException(final Throwable cause) {
     super(cause);
   }
   
@@ -46,7 +46,7 @@ public class AccumuloException extends E
    * @param cause
    *          is the exception that this exception wraps
    */
-  public AccumuloException(String why, Throwable cause) {
+  public AccumuloException(final String why, final Throwable cause) {
     super(why, cause);
   }
   

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/AccumuloSecurityException.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/AccumuloSecurityException.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/AccumuloSecurityException.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/AccumuloSecurityException.java Sat Sep 29 05:42:59 2012
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.security
 public class AccumuloSecurityException extends Exception {
   private static final long serialVersionUID = 1L;
   
-  private static String getDefaultErrorMessage(SecurityErrorCode errorcode) {
+  private static String getDefaultErrorMessage(final SecurityErrorCode errorcode) {
     switch (errorcode) {
       case BAD_CREDENTIALS:
         return "Username or Password is Invalid";
@@ -68,7 +68,7 @@ public class AccumuloSecurityException e
    * @param cause
    *          the exception that caused this violation
    */
-  public AccumuloSecurityException(String user, SecurityErrorCode errorcode, Throwable cause) {
+  public AccumuloSecurityException(final String user, final SecurityErrorCode errorcode, final Throwable cause) {
     super(getDefaultErrorMessage(errorcode), cause);
     this.user = user;
     this.errorCode = errorcode == null ? SecurityErrorCode.DEFAULT_SECURITY_ERROR : errorcode;
@@ -80,7 +80,7 @@ public class AccumuloSecurityException e
    * @param errorcode
    *          the specific reason for this exception
    */
-  public AccumuloSecurityException(String user, SecurityErrorCode errorcode) {
+  public AccumuloSecurityException(final String user, final SecurityErrorCode errorcode) {
     super(getDefaultErrorMessage(errorcode));
     this.user = user;
     this.errorCode = errorcode == null ? SecurityErrorCode.DEFAULT_SECURITY_ERROR : errorcode;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java Sat Sep 29 05:42:59 2012
@@ -17,6 +17,7 @@
 package org.apache.accumulo.core.client;
 
 import java.util.Collection;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.data.Range;
 
@@ -44,4 +45,19 @@ public interface BatchScanner extends Sc
    * Cleans up and finalizes the scanner
    */
   void close();
+  
+  /**
+   * Sets a timeout threshold for a server to respond. The batch scanner will accomplish as much work as possible before throwing an exception. BatchScanner
+   * iterators will throw a {@link TimedOutException} when all needed servers timeout. Setting the timeout to zero or Long.MAX_VALUE and TimeUnit.MILLISECONDS
+   * means no timeout.
+   * 
+   * <p>
+   * If not set, there is not timeout. The BatchScanner will retry forever.
+   * 
+   * @param timeout
+   * @param timeUnit
+   *          determines how timeout is interpreted
+   */
+  @Override
+  void setTimeout(long timeout, TimeUnit timeUnit);
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java Sat Sep 29 05:42:59 2012
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.client.mock.IteratorAdapter;
@@ -53,7 +54,6 @@ import org.apache.hadoop.io.Text;
  */
 public class ClientSideIteratorScanner extends ScannerOptions implements Scanner {
   private int size;
-  private int timeOut;
   
   private Range range;
   private boolean isolated = false;
@@ -72,12 +72,12 @@ public class ClientSideIteratorScanner e
      * @param scanner
      *          the scanner to iterate over
      */
-    public ScannerTranslator(Scanner scanner) {
+    public ScannerTranslator(final Scanner scanner) {
       this.scanner = scanner;
     }
     
     @Override
-    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    public void init(final SortedKeyValueIterator<Key,Value> source, final Map<String,String> options, final IteratorEnvironment env) throws IOException {
       throw new UnsupportedOperationException();
     }
     
@@ -95,9 +95,10 @@ public class ClientSideIteratorScanner e
     }
     
     @Override
-    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-      if (!inclusive && columnFamilies.size() > 0)
+    public void seek(final Range range, final Collection<ByteSequence> columnFamilies, final boolean inclusive) throws IOException {
+      if (!inclusive && columnFamilies.size() > 0) {
         throw new IllegalArgumentException();
+      }
       scanner.setRange(range);
       scanner.clearColumns();
       for (ByteSequence colf : columnFamilies) {
@@ -118,7 +119,7 @@ public class ClientSideIteratorScanner e
     }
     
     @Override
-    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    public SortedKeyValueIterator<Key,Value> deepCopy(final IteratorEnvironment env) {
       return new ScannerTranslator(scanner);
     }
   }
@@ -131,11 +132,11 @@ public class ClientSideIteratorScanner e
    * @param scanner
    *          the source scanner
    */
-  public ClientSideIteratorScanner(Scanner scanner) {
+  public ClientSideIteratorScanner(final Scanner scanner) {
     smi = new ScannerTranslator(scanner);
     this.range = scanner.getRange();
     this.size = scanner.getBatchSize();
-    this.timeOut = scanner.getTimeOut();
+    this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
   }
   
   /**
@@ -143,20 +144,20 @@ public class ClientSideIteratorScanner e
    * 
    * @param scanner
    */
-  public void setSource(Scanner scanner) {
+  public void setSource(final Scanner scanner) {
     smi = new ScannerTranslator(scanner);
   }
   
   @Override
   public Iterator<Entry<Key,Value>> iterator() {
     smi.scanner.setBatchSize(size);
-    smi.scanner.setTimeOut(timeOut);
+    smi.scanner.setTimeout(timeOut, TimeUnit.MILLISECONDS);
     if (isolated)
       smi.scanner.enableIsolation();
     else
       smi.scanner.disableIsolation();
     
-    TreeMap<Integer,IterInfo> tm = new TreeMap<Integer,IterInfo>();
+    final TreeMap<Integer,IterInfo> tm = new TreeMap<Integer,IterInfo>();
     
     for (IterInfo iterInfo : serverSideIteratorList) {
       tm.put(iterInfo.getPriority(), iterInfo);
@@ -166,7 +167,7 @@ public class ClientSideIteratorScanner e
     try {
       skvi = IteratorUtil.loadIterators(smi, tm.values(), serverSideIteratorOptions, new IteratorEnvironment() {
         @Override
-        public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
+        public SortedKeyValueIterator<Key,Value> reserveMapFileReader(final String mapFileName) throws IOException {
           return null;
         }
         
@@ -186,13 +187,13 @@ public class ClientSideIteratorScanner e
         }
         
         @Override
-        public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {}
+        public void registerSideChannel(final SortedKeyValueIterator<Key,Value> iter) {}
       }, false);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
     
-    Set<ByteSequence> colfs = new TreeSet<ByteSequence>();
+    final Set<ByteSequence> colfs = new TreeSet<ByteSequence>();
     for (Column c : this.getFetchedColumns()) {
       colfs.add(new ArrayByteSequence(c.getColumnFamily()));
     }
@@ -208,16 +209,22 @@ public class ClientSideIteratorScanner e
   
   @Override
   public void setTimeOut(int timeOut) {
-    this.timeOut = timeOut;
+    if (timeOut == Integer.MAX_VALUE)
+      setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    else
+      setTimeout(timeOut, TimeUnit.SECONDS);
   }
   
   @Override
   public int getTimeOut() {
-    return timeOut;
+    long timeout = getTimeout(TimeUnit.SECONDS);
+    if (timeout >= Integer.MAX_VALUE)
+      return Integer.MAX_VALUE;
+    return (int) timeout;
   }
   
   @Override
-  public void setRange(Range range) {
+  public void setRange(final Range range) {
     this.range = range;
   }
   
@@ -227,7 +234,7 @@ public class ClientSideIteratorScanner e
   }
   
   @Override
-  public void setBatchSize(int size) {
+  public void setBatchSize(final int size) {
     this.size = size;
   }
   

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/Connector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/Connector.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/Connector.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/Connector.java Sat Sep 29 05:42:59 2012
@@ -99,13 +99,36 @@ public class Connector {
    * @return BatchDeleter object for configuring and deleting
    * @throws TableNotFoundException
    *           when the specified table doesn't exist
+   * @deprecated As of 1.5, replaced by {@link #createBatchDeleter(String, Authorizations, int, BatchWriterConfig)}
    */
+  @Deprecated
   public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, long maxMemory, long maxLatency,
       int maxWriteThreads) throws TableNotFoundException {
     return impl.createBatchDeleter(tableName, authorizations, numQueryThreads, maxMemory, maxLatency, maxWriteThreads);
   }
   
   /**
+   * 
+   * @param tableName
+   *          the name of the table to query and delete from
+   * @param authorizations
+   *          A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in
+   *          must be a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are
+   *          passed, then an exception will be thrown.
+   * @param numQueryThreads
+   *          the number of concurrent threads to spawn for querying
+   * @param config
+   *          configuration used to create batch writer
+   * @return BatchDeleter object for configuring and deleting
+   * @throws TableNotFoundException
+   */
+
+  public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, BatchWriterConfig config)
+      throws TableNotFoundException {
+    return impl.createBatchDeleter(tableName, authorizations, numQueryThreads, config);
+  }
+
+  /**
    * Factory method to create a BatchWriter connected to Accumulo.
    * 
    * @param tableName
@@ -120,12 +143,29 @@ public class Connector {
    * @return BatchWriter object for configuring and writing data to
    * @throws TableNotFoundException
    *           when the specified table doesn't exist
+   * @deprecated As of 1.5, replaced by {@link #createBatchWriter(String, BatchWriterConfig)}
    */
+  @Deprecated
   public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException {
     return impl.createBatchWriter(tableName, maxMemory, maxLatency, maxWriteThreads);
   }
   
   /**
+   * Factory method to create a BatchWriter connected to Accumulo.
+   * 
+   * @param tableName
+   *          the name of the table to insert data into
+   * @param config
+   *          configuration used to create batch writer
+   * @return BatchWriter object for configuring and writing data to
+   * @throws TableNotFoundException
+   */
+
+  public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException {
+    return impl.createBatchWriter(tableName, config);
+  }
+
+  /**
    * Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables, which is good for
    * ingesting data into multiple tables from the same source
    * 
@@ -137,12 +177,27 @@ public class Connector {
    *          the maximum number of threads to use for writing data to the tablet servers
    * 
    * @return MultiTableBatchWriter object for configuring and writing data to
+   * @deprecated As of 1.5, replaced by {@link #createMultiTableBatchWriter(BatchWriterConfig)}
    */
+  @Deprecated
   public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) {
     return impl.createMultiTableBatchWriter(maxMemory, maxLatency, maxWriteThreads);
   }
   
   /**
+   * Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables. Also data for
+   * multiple tables can be sent to a server in a single batch. Its an efficient way to ingest data into multiple tables from a single process.
+   * 
+   * @param config
+   *          configuration used to create multi-table batch writer
+   * @return MultiTableBatchWriter object for configuring and writing data to
+   */
+
+  public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) {
+    return impl.createMultiTableBatchWriter(config);
+  }
+  
+  /**
    * Factory method to create a Scanner connected to Accumulo.
    * 
    * @param tableName

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java Sat Sep 29 05:42:59 2012
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.client;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.impl.IsolationException;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
@@ -45,11 +46,11 @@ public class IsolatedScanner extends Sca
     private Entry<Key,Value> nextRowStart;
     private Iterator<Entry<Key,Value>> rowIter;
     private ByteSequence lastRow = null;
+    private long timeout;
     
     private Scanner scanner;
     private ScannerOptions opts;
     private Range range;
-    private int timeOut;
     private int batchSize;
     
     private void readRow() {
@@ -120,7 +121,7 @@ public class IsolatedScanner extends Sca
       synchronized (scanner) {
         scanner.enableIsolation();
         scanner.setBatchSize(batchSize);
-        scanner.setTimeOut(timeOut);
+        scanner.setTimeout(timeout, TimeUnit.MILLISECONDS);
         scanner.setRange(r);
         setOptions((ScannerOptions) scanner, opts);
         
@@ -129,11 +130,11 @@ public class IsolatedScanner extends Sca
       }
     }
     
-    public RowBufferingIterator(Scanner scanner, ScannerOptions opts, Range range, int timeOut, int batchSize, RowBufferFactory bufferFactory) {
+    public RowBufferingIterator(Scanner scanner, ScannerOptions opts, Range range, long timeout, int batchSize, RowBufferFactory bufferFactory) {
       this.scanner = scanner;
       this.opts = new ScannerOptions(opts);
       this.range = range;
-      this.timeOut = timeOut;
+      this.timeout = timeout;
       this.batchSize = batchSize;
       
       buffer = bufferFactory.newBuffer();
@@ -208,7 +209,6 @@ public class IsolatedScanner extends Sca
   
   private Scanner scanner;
   private Range range;
-  private int timeOut;
   private int batchSize;
   private RowBufferFactory bufferFactory;
   
@@ -219,7 +219,7 @@ public class IsolatedScanner extends Sca
   public IsolatedScanner(Scanner scanner, RowBufferFactory bufferFactory) {
     this.scanner = scanner;
     this.range = scanner.getRange();
-    this.timeOut = scanner.getTimeOut();
+    this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
     this.batchSize = scanner.getBatchSize();
     this.bufferFactory = bufferFactory;
   }
@@ -231,12 +231,18 @@ public class IsolatedScanner extends Sca
   
   @Override
   public void setTimeOut(int timeOut) {
-    this.timeOut = timeOut;
+    if (timeOut == Integer.MAX_VALUE)
+      setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    else
+      setTimeout(timeOut, TimeUnit.SECONDS);
   }
   
   @Override
   public int getTimeOut() {
-    return timeOut;
+    long timeout = getTimeout(TimeUnit.SECONDS);
+    if (timeout >= Integer.MAX_VALUE)
+      return Integer.MAX_VALUE;
+    return (int) timeout;
   }
   
   @Override

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/IteratorSetting.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/IteratorSetting.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/IteratorSetting.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/IteratorSetting.java Sat Sep 29 05:42:59 2012
@@ -16,6 +16,9 @@
  */
 package org.apache.accumulo.core.client;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -28,6 +31,8 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Configure an iterator for minc, majc, and/or scan. By default, IteratorSetting will be configured for scan.
@@ -42,7 +47,7 @@ import org.apache.hadoop.io.Text;
  * scanner.addScanIterator(cfg);
  * </pre>
  */
-public class IteratorSetting {
+public class IteratorSetting implements Writable {
   private int priority;
   private String name;
   private String iteratorClass;
@@ -185,6 +190,11 @@ public class IteratorSetting {
     this(priority, name, iteratorClass.getName());
   }
   
+  public IteratorSetting(DataInput din) throws IOException {
+    this.properties = new HashMap<String,String>();
+    this.readFields(din);
+  }
+
   /**
    * Add another option to the iterator.
    * 
@@ -297,4 +307,29 @@ public class IteratorSetting {
     }
     
   }
+  
+  @Override
+  public void readFields(DataInput din) throws IOException {
+    priority = WritableUtils.readVInt(din);
+    name = WritableUtils.readString(din);
+    iteratorClass = WritableUtils.readString(din);
+    properties.clear();
+    int size = WritableUtils.readVInt(din);
+    while (size > 0) {
+      properties.put(WritableUtils.readString(din), WritableUtils.readString(din));
+      size--;
+    }
+  }
+  
+  @Override
+  public void write(DataOutput dout) throws IOException {
+    WritableUtils.writeVInt(dout, priority);
+    WritableUtils.writeString(dout, name);
+    WritableUtils.writeString(dout, iteratorClass);
+    WritableUtils.writeVInt(dout, properties.size());
+    for (Entry<String,String> e : properties.entrySet()) {
+      WritableUtils.writeString(dout, e.getKey());
+      WritableUtils.writeString(dout, e.getValue());
+    }
+  }
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/Scanner.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/Scanner.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/Scanner.java Sat Sep 29 05:42:59 2012
@@ -32,14 +32,18 @@ public interface Scanner extends Scanner
    * 
    * @param timeOut
    *          in seconds
+   * @deprecated Since 1.5. See {@link ScannerBase#setTimeout(long, java.util.concurrent.TimeUnit)}
    */
+  @Deprecated
   public void setTimeOut(int timeOut);
   
   /**
    * Returns the setting for how long a scanner will automatically retry when a failure occurs.
    * 
    * @return the timeout configured for this scanner
+   * @deprecated Since 1.5. See {@link ScannerBase#getTimeout(java.util.concurrent.TimeUnit)}
    */
+  @Deprecated
   public int getTimeOut();
   
   /**

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java Sat Sep 29 05:42:59 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.client;
 
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
@@ -100,4 +101,22 @@ public interface ScannerBase extends Ite
    * @return an iterator over Key,Value pairs which meet the restrictions set on the scanner
    */
   public Iterator<Entry<Key,Value>> iterator();
+  
+  /**
+   * This setting determines how long a scanner will automatically retry when a failure occurs. By default a scanner will retry forever.
+   * 
+   * Setting to zero or Long.MAX_VALUE and TimeUnit.MILLISECONDS means to retry forever.
+   * 
+   * @param timeOut
+   * @param timeUnit
+   *          determines how timeout is interpreted
+   */
+  public void setTimeout(long timeOut, TimeUnit timeUnit);
+  
+  /**
+   * Returns the setting for how long a scanner will automatically retry when a failure occurs.
+   * 
+   * @return the timeout configured for this scanner
+   */
+  public long getTimeout(TimeUnit timeUnit);
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java Sat Sep 29 05:42:59 2012
@@ -233,6 +233,13 @@ public class ZooKeeperInstance implement
   }
   
   /**
+   * @deprecated Use {@link #lookupInstanceName(org.apache.accumulo.fate.zookeeper.ZooCache, UUID)} instead
+   */
+  public static String lookupInstanceName(org.apache.accumulo.core.zookeeper.ZooCache zooCache, UUID instanceId) {
+    return lookupInstanceName((ZooCache) zooCache, instanceId);
+  }
+  
+  /**
    * Given a zooCache and instanceId, look up the instance name.
    * 
    * @param zooCache

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/FindMax.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/FindMax.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/FindMax.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/FindMax.java Sat Sep 29 05:42:59 2012
@@ -89,7 +89,7 @@ public class FindMax {
   
   private static Text _findMax(Scanner scanner, Text start, boolean inclStart, Text end, boolean inclEnd) {
     
-    // System.out.printf("findMax(%s, %s, %s, %s)\n", Key.toPrintableString(start.getBytes(), 0, start.getLength(), 1000), inclStart,
+    // System.out.printf("findMax(%s, %s, %s, %s)%n", Key.toPrintableString(start.getBytes(), 0, start.getLength(), 1000), inclStart,
     // Key.toPrintableString(end.getBytes(), 0, end.getLength(), 1000), inclEnd);
     
     int cmp = start.compareTo(end);

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java Sat Sep 29 05:42:59 2012
@@ -36,7 +36,6 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java Sat Sep 29 05:42:59 2012
@@ -98,6 +98,36 @@ public interface TableOperations {
   public void create(String tableName, boolean versioningIter, TimeType timeType) throws AccumuloException, AccumuloSecurityException, TableExistsException;
   
   /**
+   * Imports a table exported via exportTable and copied via hadoop distcp.
+   * 
+   * @param tableName
+   *          Name of a table to create and import into.
+   * @param importDir
+   *          Directory that contains the files copied by distcp from exportTable
+   * @throws TableExistsException
+   * @throws AccumuloException
+   * @throws AccumuloSecurityException
+   */
+  public void importTable(String tableName, String importDir) throws TableExistsException, AccumuloException, AccumuloSecurityException;
+  
+  /**
+   * Exports a table. The tables data is not exported, just table metadata and a list of files to distcp. The table being exported must be offline and stay
+   * offline for the duration of distcp. To avoid losing access to a table it can be cloned and the clone taken offline for export.
+   * 
+   * <p>
+   * See docs/examples/README.export
+   * 
+   * @param tableName
+   *          Name of the table to export.
+   * @param exportDir
+   *          An empty directory in HDFS where files containing table metadata and list of files to distcp will be placed.
+   * @throws TableNotFoundException
+   * @throws AccumuloException
+   * @throws AccumuloSecurityException
+   */
+  public void exportTable(String tableName, String exportDir) throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
+
+  /**
    * @param tableName
    *          the name of the table
    * @param partitionKeys

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java Sat Sep 29 05:42:59 2012
@@ -16,7 +16,9 @@
  */
 package org.apache.accumulo.core.client.admin;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -37,6 +39,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 
 import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.core.Constants;
@@ -65,6 +69,7 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -1003,7 +1008,7 @@ public class TableOperationsImpl extends
   public void importDirectory(String tableName, String dir, String failureDir, boolean setTime) throws IOException, AccumuloSecurityException,
       TableNotFoundException, AccumuloException {
     ArgumentChecker.notNull(tableName, dir, failureDir);
-    FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+    FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration());
     Path failPath = fs.makeQualified(new Path(failureDir));
     if (!fs.exists(new Path(dir)))
       throw new AccumuloException("Bulk import directory " + dir + " does not exist!");
@@ -1107,4 +1112,76 @@ public class TableOperationsImpl extends
     Scanner scanner = instance.getConnector(credentials).createScanner(tableName, auths);
     return FindMax.findMax(scanner, startRow, startInclusive, endRow, endInclusive);
   }
+  
+  public static Map<String,String> getExportedProps(FileSystem fs, Path path) throws IOException {
+    HashMap<String,String> props = new HashMap<String,String>();
+    
+    ZipInputStream zis = new ZipInputStream(fs.open(path));
+    try {
+      ZipEntry zipEntry;
+      while ((zipEntry = zis.getNextEntry()) != null) {
+        if (zipEntry.getName().equals(Constants.EXPORT_TABLE_CONFIG_FILE)) {
+          BufferedReader in = new BufferedReader(new InputStreamReader(zis));
+          String line;
+          while ((line = in.readLine()) != null) {
+            String sa[] = line.split("=", 2);
+            props.put(sa[0], sa[1]);
+          }
+          
+          break;
+        }
+      }
+    } finally {
+      zis.close();
+    }
+    return props;
+  }
+
+  @Override
+  public void importTable(String tableName, String importDir) throws TableExistsException, AccumuloException, AccumuloSecurityException {
+    ArgumentChecker.notNull(tableName, importDir);
+    
+    try{
+      FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration());;
+      Map<String,String> props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE));
+      
+      for(String propKey : props.keySet()){
+        if (Property.isClassProperty(propKey) && !props.get(propKey).contains(Constants.CORE_PACKAGE_NAME)) {
+          Logger.getLogger(this.getClass()).info(
+              "Imported table sets '" + propKey + "' to '" + props.get(propKey) + "'.  Ensure this class is on Accumulo classpath.");
+        }
+      }
+      
+    }catch(IOException ioe){
+      Logger.getLogger(this.getClass()).warn("Failed to check if imported table references external java classes : " + ioe.getMessage());
+    }
+    
+    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(importDir.getBytes()));
+    
+    Map<String,String> opts = Collections.emptyMap();
+    
+    try {
+      doTableOperation(TableOperation.IMPORT, args, opts);
+    } catch (TableNotFoundException e1) {
+      // should not happen
+      throw new RuntimeException(e1);
+    }
+    
+  }
+  
+  @Override
+  public void exportTable(String tableName, String exportDir) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    ArgumentChecker.notNull(tableName, exportDir);
+    
+    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(exportDir.getBytes()));
+    
+    Map<String,String> opts = Collections.emptyMap();
+    
+    try {
+      doTableOperation(TableOperation.EXPORT, args, opts);
+    } catch (TableExistsException e1) {
+      // should not happen
+      throw new RuntimeException(e1);
+    }
+  }
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/AccumuloServerException.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/AccumuloServerException.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/AccumuloServerException.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/AccumuloServerException.java Sat Sep 29 05:42:59 2012
@@ -28,12 +28,12 @@ public class AccumuloServerException ext
   private static final long serialVersionUID = 1L;
   private String server;
   
-  public AccumuloServerException(String server, TApplicationException tae) {
+  public AccumuloServerException(final String server, final TApplicationException tae) {
     super("Error on server " + server, tae);
     this.setServer(server);
   }
   
-  private void setServer(String server) {
+  private void setServer(final String server) {
     this.server = server;
   }
   

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java Sat Sep 29 05:42:59 2012
@@ -17,6 +17,7 @@
 package org.apache.accumulo.core.client.impl;
 
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.data.Mutation;
@@ -28,10 +29,10 @@ public class BatchWriterImpl implements 
   private String table;
   private TabletServerBatchWriter bw;
   
-  public BatchWriterImpl(Instance instance, AuthInfo credentials, String table, long maxMemory, long maxLatency, int maxWriteThreads) {
+  public BatchWriterImpl(Instance instance, AuthInfo credentials, String table, BatchWriterConfig config) {
     ArgumentChecker.notNull(instance, credentials, table);
     this.table = table;
-    this.bw = new TabletServerBatchWriter(instance, credentials, maxMemory, maxLatency, maxWriteThreads);
+    this.bw = new TabletServerBatchWriter(instance, credentials, config);
   }
   
   @Override

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java Sat Sep 29 05:42:59 2012
@@ -17,6 +17,7 @@
 package org.apache.accumulo.core.client.impl;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -24,6 +25,7 @@ import org.apache.accumulo.core.client.A
 import org.apache.accumulo.core.client.BatchDeleter;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -104,18 +106,34 @@ public class ConnectorImpl extends Conne
   public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, long maxMemory, long maxLatency,
       int maxWriteThreads) throws TableNotFoundException {
     ArgumentChecker.notNull(tableName, authorizations);
-    return new TabletServerBatchDeleter(instance, credentials, getTableId(tableName), authorizations, numQueryThreads, maxMemory, maxLatency, maxWriteThreads);
+    return new TabletServerBatchDeleter(instance, credentials, getTableId(tableName), authorizations, numQueryThreads, new BatchWriterConfig()
+        .setMaxMemory(maxMemory).setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
+  }
+  
+  @Override
+  public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, BatchWriterConfig config)
+      throws TableNotFoundException {
+    ArgumentChecker.notNull(tableName, authorizations);
+    return new TabletServerBatchDeleter(instance, credentials, getTableId(tableName), authorizations, numQueryThreads, config);
   }
   
   @Override
   public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException {
     ArgumentChecker.notNull(tableName);
-    return new BatchWriterImpl(instance, credentials, getTableId(tableName), maxMemory, maxLatency, maxWriteThreads);
+    return new BatchWriterImpl(instance, credentials, getTableId(tableName), new BatchWriterConfig().setMaxMemory(maxMemory)
+        .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
+  }
+  
+  @Override
+  public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException {
+    ArgumentChecker.notNull(tableName);
+    return new BatchWriterImpl(instance, credentials, getTableId(tableName), config);
   }
   
   @Override
   public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) {
-    return new MultiTableBatchWriterImpl(instance, credentials, maxMemory, maxLatency, maxWriteThreads);
+    return new MultiTableBatchWriterImpl(instance, credentials, new BatchWriterConfig().setMaxMemory(maxMemory)
+        .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
   }
   
   @Override

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java Sat Sep 29 05:42:59 2012
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -71,10 +72,10 @@ public class MultiTableBatchWriterImpl i
   private HashMap<String,BatchWriter> tableWriters;
   private Instance instance;
   
-  public MultiTableBatchWriterImpl(Instance instance, AuthInfo credentials, long maxMemory, long maxLatency, int maxWriteThreads) {
+  public MultiTableBatchWriterImpl(Instance instance, AuthInfo credentials, BatchWriterConfig config) {
     ArgumentChecker.notNull(instance, credentials);
     this.instance = instance;
-    this.bw = new TabletServerBatchWriter(instance, credentials, maxMemory, maxLatency, maxWriteThreads);
+    this.bw = new TabletServerBatchWriter(instance, credentials, config);
     tableWriters = new HashMap<String,BatchWriter>();
     this.closed = false;
   }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java Sat Sep 29 05:42:59 2012
@@ -128,7 +128,7 @@ class OfflineIterator implements Iterato
     this.range = range;
     
     if (this.options.fetchedColumns.size() > 0) {
-      range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
+      this.range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
     }
 
     this.tableId = table.toString();

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java Sat Sep 29 05:42:59 2012
@@ -29,6 +29,7 @@ package org.apache.accumulo.core.client.
 
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
@@ -54,7 +55,6 @@ public class ScannerImpl extends Scanner
   private Text table;
   
   private int size;
-  private int timeOut;
   
   private Range range;
   private boolean isolated = false;
@@ -68,23 +68,6 @@ public class ScannerImpl extends Scanner
     this.authorizations = authorizations;
     
     this.size = Constants.SCAN_BATCH_SIZE;
-    this.timeOut = Integer.MAX_VALUE;
-  }
-  
-  /**
-   * When failure occurs, the scanner automatically retries. This setting determines how long a scanner will retry. By default a scanner will retry forever.
-   * 
-   * @param timeOut
-   *          in milliseconds
-   */
-  @Override
-  public synchronized void setTimeOut(int timeOut) {
-    this.timeOut = timeOut;
-  }
-  
-  @Override
-  public synchronized int getTimeOut() {
-    return timeOut;
   }
   
   @Override
@@ -116,7 +99,7 @@ public class ScannerImpl extends Scanner
    * Scanner object will have no effect on existing iterators.
    */
   public synchronized Iterator<Entry<Key,Value>> iterator() {
-    return new ScannerIterator(instance, credentials, table, authorizations, range, size, timeOut, this, isolated);
+    return new ScannerIterator(instance, credentials, table, authorizations, range, size, getTimeOut(), this, isolated);
   }
   
   @Override
@@ -128,4 +111,20 @@ public class ScannerImpl extends Scanner
   public synchronized void disableIsolation() {
     this.isolated = false;
   }
+  
+  @Override
+  public void setTimeOut(int timeOut) {
+    if (timeOut == Integer.MAX_VALUE)
+      setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    else
+      setTimeout(timeOut, TimeUnit.SECONDS);
+  }
+  
+  @Override
+  public int getTimeOut() {
+    long timeout = getTimeout(TimeUnit.SECONDS);
+    if (timeout >= Integer.MAX_VALUE)
+      return Integer.MAX_VALUE;
+    return (int) timeout;
+  }
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java Sat Sep 29 05:42:59 2012
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.ScannerBase;
@@ -44,6 +45,8 @@ public class ScannerOptions implements S
   
   protected SortedSet<Column> fetchedColumns = new TreeSet<Column>();
   
+  protected long timeOut = Long.MAX_VALUE;
+
   private String regexIterName = null;
   
   protected ScannerOptions() {}
@@ -181,4 +184,21 @@ public class ScannerOptions implements S
   public Iterator<Entry<Key,Value>> iterator() {
     throw new UnsupportedOperationException();
   }
+  
+  @Override
+  public void setTimeout(long timeout, TimeUnit timeUnit) {
+    if (timeOut < 0) {
+      throw new IllegalArgumentException("TimeOut must be positive : " + timeOut);
+    }
+
+    if (timeout == 0)
+      this.timeOut = Long.MAX_VALUE;
+    else
+      this.timeOut = timeUnit.toMillis(timeout);
+  }
+  
+  @Override
+  public long getTimeout(TimeUnit timeunit) {
+    return timeunit.convert(timeOut, TimeUnit.MILLISECONDS);
+  }
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java Sat Sep 29 05:42:59 2012
@@ -21,6 +21,7 @@ import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.BatchDeleter;
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -38,19 +39,15 @@ public class TabletServerBatchDeleter ex
   private Instance instance;
   private AuthInfo credentials;
   private String tableId;
-  private long maxMemory;
-  private long maxLatency;
-  private int maxWriteThreads;
+  private BatchWriterConfig bwConfig;
   
-  public TabletServerBatchDeleter(Instance instance, AuthInfo credentials, String tableId, Authorizations authorizations, int numQueryThreads, long maxMemory,
-      long maxLatency, int maxWriteThreads) throws TableNotFoundException {
+  public TabletServerBatchDeleter(Instance instance, AuthInfo credentials, String tableId, Authorizations authorizations, int numQueryThreads,
+      BatchWriterConfig bwConfig) throws TableNotFoundException {
     super(instance, credentials, tableId, authorizations, numQueryThreads);
     this.instance = instance;
     this.credentials = credentials;
     this.tableId = tableId;
-    this.maxMemory = maxMemory;
-    this.maxLatency = maxLatency;
-    this.maxWriteThreads = maxWriteThreads;
+    this.bwConfig = bwConfig;
     super.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, BatchDeleter.class.getName() + ".NOVALUE", SortedKeyIterator.class));
   }
   
@@ -58,7 +55,7 @@ public class TabletServerBatchDeleter ex
   public void delete() throws MutationsRejectedException, TableNotFoundException {
     BatchWriter bw = null;
     try {
-      bw = new BatchWriterImpl(instance, credentials, tableId, maxMemory, maxLatency, maxWriteThreads);
+      bw = new BatchWriterImpl(instance, credentials, tableId, bwConfig);
       Iterator<Entry<Key,Value>> iter = super.iterator();
       while (iter.hasNext()) {
         Entry<Key,Value> next = iter.next();

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java Sat Sep 29 05:42:59 2012
@@ -107,6 +107,6 @@ public class TabletServerBatchReader ext
       throw new IllegalStateException("batch reader closed");
     }
     
-    return new TabletServerBatchReaderIterator(instance, credentials, table, authorizations, ranges, numThreads, queryThreadPool, this);
+    return new TabletServerBatchReaderIterator(instance, credentials, table, authorizations, ranges, numThreads, queryThreadPool, this, timeOut);
   }
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java Sat Sep 29 05:42:59 2012
@@ -28,6 +28,7 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
@@ -42,7 +43,9 @@ import org.apache.accumulo.core.client.I
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -94,6 +97,12 @@ public class TabletServerBatchReaderIter
   
   private volatile Throwable fatalException = null;
   
+  private Map<String,TimeoutTracker> timeoutTrackers;
+  private Set<String> timedoutServers;
+  private long timeout;
+
+  private TabletLocator locator;
+
   public interface ResultReceiver {
     void receive(List<Entry<Key,Value>> entries);
   }
@@ -126,7 +135,7 @@ public class TabletServerBatchReaderIter
   }
   
   public TabletServerBatchReaderIterator(Instance instance, AuthInfo credentials, String table, Authorizations authorizations, ArrayList<Range> ranges,
-      int numThreads, ExecutorService queryThreadPool, ScannerOptions scannerOptions) {
+      int numThreads, ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout) {
     
     this.instance = instance;
     this.credentials = credentials;
@@ -137,6 +146,12 @@ public class TabletServerBatchReaderIter
     this.options = new ScannerOptions(scannerOptions);
     resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads);
     
+    this.locator = new TimeoutTabletLocator(TabletLocator.getInstance(instance, credentials, new Text(table)), timeout);
+
+    timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchReaderIterator.TimeoutTracker>());
+    timedoutServers = Collections.synchronizedSet(new HashSet<String>());
+    this.timeout = timeout;
+
     if (options.fetchedColumns.size() > 0) {
       ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size());
       for (Range range : ranges) {
@@ -229,7 +244,7 @@ public class TabletServerBatchReaderIter
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     
-    binRanges(TabletLocator.getInstance(instance, credentials, new Text(table)), ranges, binnedRanges);
+    binRanges(locator, ranges, binnedRanges);
     
     doLookups(binnedRanges, receiver, columns);
   }
@@ -303,11 +318,9 @@ public class TabletServerBatchReaderIter
     for (List<Range> ranges : failures.values())
       allRanges.addAll(ranges);
     
-    TabletLocator tabletLocator = TabletLocator.getInstance(instance, credentials, new Text(table));
-    
     // since the first call to binRanges clipped the ranges to within a tablet, we should not get only
     // bin to the set of failed tablets
-    binRanges(tabletLocator, allRanges, binnedRanges);
+    binRanges(locator, allRanges, binnedRanges);
     
     doLookups(binnedRanges, receiver, columns);
   }
@@ -341,10 +354,15 @@ public class TabletServerBatchReaderIter
       Map<KeyExtent,List<Range>> unscanned = new HashMap<KeyExtent,List<Range>>();
       Map<KeyExtent,List<Range>> tsFailures = new HashMap<KeyExtent,List<Range>>();
       try {
-        doLookup(tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, credentials, options, authorizations, instance.getConfiguration());
+        TimeoutTracker timeoutTracker = timeoutTrackers.get(tsLocation);
+        if (timeoutTracker == null) {
+          timeoutTracker = new TimeoutTracker(tsLocation, timedoutServers, timeout);
+          timeoutTrackers.put(tsLocation, timeoutTracker);
+        }
+        doLookup(tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, credentials, options, authorizations, instance.getConfiguration(),
+            timeoutTracker);
         if (tsFailures.size() > 0) {
-          TabletLocator tabletLocator = TabletLocator.getInstance(instance, credentials, new Text(table));
-          tabletLocator.invalidateCache(tsFailures.keySet());
+          locator.invalidateCache(tsFailures.keySet());
           synchronized (failures) {
             failures.putAll(tsFailures);
           }
@@ -356,7 +374,7 @@ public class TabletServerBatchReaderIter
           failures.putAll(unscanned);
         }
         
-        TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(tsLocation);
+        locator.invalidateCache(tsLocation);
         log.debug(e.getMessage(), e);
       } catch (AccumuloSecurityException e) {
         log.debug(e.getMessage(), e);
@@ -425,6 +443,12 @@ public class TabletServerBatchReaderIter
   }
   
   private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, final ResultReceiver receiver, List<Column> columns) {
+    
+    if (timedoutServers.containsAll(binnedRanges.keySet())) {
+      // all servers have timed out
+      throw new TimedOutException(timedoutServers);
+    }
+
     // when there are lots of threads and a few tablet servers
     // it is good to break request to tablet servers up, the
     // following code determines if this is the case
@@ -444,6 +468,17 @@ public class TabletServerBatchReaderIter
     
     Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
     
+    if (timedoutServers.size() > 0) {
+      // go ahead and fail any timed out servers
+      for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator = binnedRanges.entrySet().iterator(); iterator.hasNext();) {
+        Entry<String,Map<KeyExtent,List<Range>>> entry = iterator.next();
+        if (timedoutServers.contains(entry.getKey())) {
+          failures.putAll(entry.getValue());
+          iterator.remove();
+        }
+      }
+    }
+
     // randomize tabletserver order... this will help when there are multiple
     // batch readers and writers running against accumulo
     List<String> locations = new ArrayList<String>(binnedRanges.keySet());
@@ -516,9 +551,65 @@ public class TabletServerBatchReaderIter
     }
   }
   
+  private static class TimeoutTracker {
+    
+    String server;
+    Set<String> badServers;
+    long timeOut;
+    long activityTime;
+    Long firstErrorTime = null;
+    
+    TimeoutTracker(String server, Set<String> badServers, long timeOut) {
+      this(timeOut);
+      this.server = server;
+      this.badServers = badServers;
+    }
+
+    TimeoutTracker(long timeOut) {
+      this.timeOut = timeOut;
+    }
+
+    void startingScan() {
+      activityTime = System.currentTimeMillis();
+    }
+    
+    void check() throws IOException {
+      if (System.currentTimeMillis() - activityTime > timeOut) {
+        badServers.add(server);
+        throw new IOException("Time exceeded " + (System.currentTimeMillis() - activityTime) + " " + server);
+      }
+    }
+    
+    void madeProgress() {
+      activityTime = System.currentTimeMillis();
+      firstErrorTime = null;
+    }
+    
+    void errorOccured(Exception e) {
+      if (firstErrorTime == null) {
+        firstErrorTime = activityTime;
+      } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
+        badServers.add(server);
+      }
+    }
+    
+    /**
+     * @return
+     */
+    public long getTimeOut() {
+      return timeOut;
+    }
+  }
+
   static void doLookup(String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
       ResultReceiver receiver, List<Column> columns, AuthInfo credentials, ScannerOptions options, Authorizations authorizations, AccumuloConfiguration conf)
       throws IOException, AccumuloSecurityException, AccumuloServerException {
+    doLookup(server, requested, failures, unscanned, receiver, columns, credentials, options, authorizations, conf, new TimeoutTracker(Long.MAX_VALUE));
+  }
+  
+  static void doLookup(String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
+      ResultReceiver receiver, List<Column> columns, AuthInfo credentials, ScannerOptions options, Authorizations authorizations, AccumuloConfiguration conf,
+      TimeoutTracker timeoutTracker) throws IOException, AccumuloSecurityException, AccumuloServerException {
     
     if (requested.size() == 0) {
       return;
@@ -533,10 +624,19 @@ public class TabletServerBatchReaderIter
       unscanned.put(new KeyExtent(entry.getKey()), ranges);
     }
     
+    timeoutTracker.startingScan();
     TTransport transport = null;
     try {
-      TabletClientService.Client client = ThriftUtil.getTServerClient(server, conf);
+      TabletClientService.Client client;
+      if (timeoutTracker.getTimeOut() < conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
+        client = ThriftUtil.getTServerClient(server, conf, timeoutTracker.getTimeOut());
+      else
+        client = ThriftUtil.getTServerClient(server, conf);
+
       try {
+        
+
+
         OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Starting multi scan, tserver=" + server + "  #tablets=" + requested.size() + "  #ranges="
             + sumSizes(requested.values()) + " ssil=" + options.serverSideIteratorList + " ssio=" + options.serverSideIteratorOptions);
         
@@ -563,10 +663,15 @@ public class TabletServerBatchReaderIter
         if (entries.size() > 0)
           receiver.receive(entries);
 
+        if (entries.size() > 0 || scanResult.fullScans.size() > 0)
+          timeoutTracker.madeProgress();
+
         trackScanning(failures, unscanned, scanResult);
         
         while (scanResult.more) {
           
+          timeoutTracker.check();
+
           opTimer.start("Continuing multi scan, scanid=" + imsr.scanID);
           scanResult = client.continueMultiScan(Tracer.traceInfo(), imsr.scanID);
           opTimer.stop("Got more multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? "  scanID=" + imsr.scanID : "")
@@ -579,6 +684,10 @@ public class TabletServerBatchReaderIter
           
           if (entries.size() > 0)
             receiver.receive(entries);
+          
+          if (entries.size() > 0 || scanResult.fullScans.size() > 0)
+            timeoutTracker.madeProgress();
+
           trackScanning(failures, unscanned, scanResult);
         }
         
@@ -589,6 +698,7 @@ public class TabletServerBatchReaderIter
       }
     } catch (TTransportException e) {
       log.debug("Server : " + server + " msg : " + e.getMessage());
+      timeoutTracker.errorOccured(e);
       throw new IOException(e);
     } catch (ThriftSecurityException e) {
       log.debug("Server : " + server + " msg : " + e.getMessage(), e);
@@ -598,6 +708,7 @@ public class TabletServerBatchReaderIter
       throw new AccumuloServerException(server, e);
     } catch (TException e) {
       log.debug("Server : " + server + " msg : " + e.getMessage(), e);
+      timeoutTracker.errorOccured(e);
       throw new IOException(e);
     } catch (NoSuchScanIDException e) {
       log.debug("Server : " + server + " msg : " + e.getMessage(), e);

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Sat Sep 29 05:42:59 2012
@@ -34,6 +34,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.cloudtrace.instrument.Span;
@@ -43,12 +44,15 @@ import org.apache.accumulo.cloudtrace.th
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.constraints.Violations;
 import org.apache.accumulo.core.data.ConstraintViolationSummary;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -72,7 +76,6 @@ import org.apache.thrift.TServiceClient;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
-
 /*
  * Differences from previous TabletServerBatchWriter
  *   + As background threads finish sending mutations to tablet servers they decrement memory usage
@@ -121,6 +124,8 @@ public class TabletServerBatchWriter {
   
   private long maxLatency;
   
+  private long timeout;
+
   private long lastProcessingStartTime;
   
   private long totalAdded = 0;
@@ -143,11 +148,52 @@ public class TabletServerBatchWriter {
   
   private Throwable lastUnknownError = null;
   
-  public TabletServerBatchWriter(Instance instance, AuthInfo credentials, long maxMemory, long maxLatency, int numSendThreads) {
+  private Map<String,TimeoutTracker> timeoutTrackers;
+
+  private static class TimeoutTracker {
+    
+    String server;
+    long timeOut;
+    long activityTime;
+    Long firstErrorTime = null;
+    
+    TimeoutTracker(String server, long timeOut) {
+      this.timeOut = timeOut;
+      this.server = server;
+    }
+    
+    void startingWrite() {
+      activityTime = System.currentTimeMillis();
+    }
+    
+    void madeProgress() {
+      activityTime = System.currentTimeMillis();
+      firstErrorTime = null;
+    }
+
+    void wroteNothing() {
+      if (firstErrorTime == null) {
+        firstErrorTime = activityTime;
+      } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
+        throw new TimedOutException(Collections.singleton(server));
+      }
+    }
+    
+    void errorOccured(Exception e) {
+      wroteNothing();
+    }
+
+    public long getTimeOut() {
+      return timeOut;
+    }
+  }
+
+  public TabletServerBatchWriter(Instance instance, AuthInfo credentials, BatchWriterConfig config) {
     this.instance = instance;
-    this.maxMem = maxMemory;
-    this.maxLatency = maxLatency <= 0 ? Long.MAX_VALUE : maxLatency;
+    this.maxMem = config.getMaxMemory();
+    this.maxLatency = config.getMaxLatency(TimeUnit.MILLISECONDS) <= 0 ? Long.MAX_VALUE : config.getMaxLatency(TimeUnit.MILLISECONDS);
     this.credentials = credentials;
+    this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
     mutations = new MutationSet();
     
     violations = new Violations();
@@ -159,9 +205,11 @@ public class TabletServerBatchWriter {
     
     jtimer = new Timer("BatchWriterLatencyTimer", true);
     
-    writer = new MutationWriter(numSendThreads);
+    writer = new MutationWriter(config.getMaxWriteThreads());
     failedMutations = new FailedMutations();
     
+    timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchWriter.TimeoutTracker>());
+
     if (this.maxLatency != Long.MAX_VALUE) {
       jtimer.schedule(new TimerTask() {
         public void run() {
@@ -242,7 +290,7 @@ public class TabletServerBatchWriter {
     }
   }
   
-  public synchronized void addMutation(String table, Iterator<Mutation> iterator) throws MutationsRejectedException {
+  public void addMutation(String table, Iterator<Mutation> iterator) throws MutationsRejectedException {
     while (iterator.hasNext()) {
       addMutation(table, iterator.next());
     }
@@ -447,7 +495,7 @@ public class TabletServerBatchWriter {
     unknownErrors++;
     this.lastUnknownError = t;
     this.notifyAll();
-    if (t instanceof TableDeletedException || t instanceof TableOfflineException)
+    if (t instanceof TableDeletedException || t instanceof TableOfflineException || t instanceof TimedOutException)
       log.debug(msg, t); // this is not unknown
     else
       log.error(msg, t);
@@ -544,18 +592,31 @@ public class TabletServerBatchWriter {
     private ExecutorService sendThreadPool;
     private Map<String,TabletServerMutations> serversMutations;
     private Set<String> queued;
+    private Map<String,TabletLocator> locators;
     
     public MutationWriter(int numSendThreads) {
       serversMutations = new HashMap<String,TabletServerMutations>();
       queued = new HashSet<String>();
       sendThreadPool = Executors.newFixedThreadPool(numSendThreads, new NamingThreadFactory(this.getClass().getName()));
+      locators = new HashMap<String,TabletLocator>();
     }
     
+    private TabletLocator getLocator(String tableId) {
+      TabletLocator ret = locators.get(tableId);
+      if (ret == null) {
+        ret = TabletLocator.getInstance(instance, credentials, new Text(tableId));
+        ret = new TimeoutTabletLocator(ret, timeout);
+        locators.put(tableId, ret);
+      }
+      
+      return ret;
+    }
+
     private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) {
       try {
         Set<Entry<String,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet();
         for (Entry<String,List<Mutation>> entry : es) {
-          TabletLocator locator = TabletLocator.getInstance(instance, credentials, new Text(entry.getKey()));
+          TabletLocator locator = getLocator(entry.getKey());
           
           String table = entry.getKey();
           List<Mutation> tableMutations = entry.getValue();
@@ -701,8 +762,15 @@ public class TabletServerBatchWriter {
           
           Span span = Trace.start("sendMutations");
           try {
+            
+            TimeoutTracker timeoutTracker = timeoutTrackers.get(location);
+            if (timeoutTracker == null) {
+              timeoutTracker = new TimeoutTracker(location, timeout);
+              timeoutTrackers.put(location, timeoutTracker);
+            }
+
             long st1 = System.currentTimeMillis();
-            failures = sendMutationsToTabletServer(location, mutationBatch);
+            failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker);
             long st2 = System.currentTimeMillis();
             if (log.isTraceEnabled())
               log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in "
@@ -744,7 +812,8 @@ public class TabletServerBatchWriter {
       }
     }
     
-    private MutationSet sendMutationsToTabletServer(String location, Map<KeyExtent,List<Mutation>> tabMuts) throws IOException, AccumuloSecurityException,
+    private MutationSet sendMutationsToTabletServer(String location, Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker) throws IOException,
+        AccumuloSecurityException,
         AccumuloServerException {
       if (tabMuts.size() == 0) {
         return new MutationSet();
@@ -752,8 +821,16 @@ public class TabletServerBatchWriter {
       TInfo tinfo = Tracer.traceInfo();
       TTransport transport = null;
       
+      timeoutTracker.startingWrite();
+
       try {
-        TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+        TabletClientService.Iface client;
+        
+        if (timeoutTracker.getTimeOut() < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
+          client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeoutTracker.getTimeOut());
+        else
+          client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+
         try {
           MutationSet allFailures = new MutationSet();
           
@@ -768,6 +845,7 @@ public class TabletServerBatchWriter {
             } catch (ConstraintViolationException e) {
               updatedConstraintViolations(Translator.translate(e.violationSummaries, Translator.TCVST));
             }
+            timeoutTracker.madeProgress();
           } else {
             
             long usid = client.startUpdate(tinfo, credentials);
@@ -790,13 +868,17 @@ public class TabletServerBatchWriter {
             }
             
             UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
+
             Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
             updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
             updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
             
+            long totalCommitted = 0;
+
             for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
               KeyExtent failedExtent = entry.getKey();
               int numCommitted = (int) (long) entry.getValue();
+              totalCommitted += numCommitted;
               
               String table = failedExtent.getTableId().toString();
               
@@ -805,12 +887,21 @@ public class TabletServerBatchWriter {
               ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
               allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
             }
+            
+            if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) {
+              // nothing was successfully written
+              timeoutTracker.wroteNothing();
+            } else {
+              // successfully wrote something to tablet server
+              timeoutTracker.madeProgress();
+            }
           }
           return allFailures;
         } finally {
           ThriftUtil.returnClient((TServiceClient) client);
         }
       } catch (TTransportException e) {
+        timeoutTracker.errorOccured(e);
         throw new IOException(e);
       } catch (TApplicationException tae) {
         updateServerErrors(location, tae);

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java?rev=1391754&r1=1391753&r2=1391754&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java Sat Sep 29 05:42:59 2012
@@ -18,7 +18,6 @@ package org.apache.accumulo.core.client.
 
 import java.util.Collection;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.data.KeyExtent;
 
 public enum TabletType {



Mime
View raw message