accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1370914 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/client/mock/ core/src/main/java/org/apache/accumulo/core...
Date Wed, 08 Aug 2012 19:47:58 GMT
Author: kturner
Date: Wed Aug  8 19:47:57 2012
New Revision: 1370914

URL: http://svn.apache.org/viewvc?rev=1370914&view=rev
Log:
ACCUMULO-705 added timeout to batch scanner

Added:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java
    accumulo/trunk/test/system/auto/simple/timeout.py
Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java?rev=1370914&r1=1370913&r2=1370914&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java Wed
Aug  8 19:47:57 2012
@@ -44,4 +44,17 @@ public interface BatchScanner extends Sc
    * Cleans up and finalizes the scanner
    */
   void close();
+  
+  /**
+   * Sets a timeout threshold for a server to respond. The batch scanner will accomplish
as much work as possible before throwing an exception. BatchScanner
+   * iterators will throw a {@link TimedOutException} when all needed servers timeout.
+   * 
+   * <p>
+   * If not set, the timeout defaults to MAX_INT
+   * 
+   * @param timeout
+   *          in seconds
+   */
+  @Override
+  void setTimeOut(int timeout);
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java?rev=1370914&r1=1370913&r2=1370914&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java Wed Aug
 8 19:47:57 2012
@@ -28,21 +28,6 @@ import org.apache.accumulo.core.data.Ran
 public interface Scanner extends ScannerBase {
   
   /**
-   * This setting determines how long a scanner will automatically retry when a failure occurs.
By default a scanner will retry forever.
-   * 
-   * @param timeOut
-   *          in seconds
-   */
-  public void setTimeOut(int timeOut);
-  
-  /**
-   * Returns the setting for how long a scanner will automatically retry when a failure occurs.
-   * 
-   * @return the timeout configured for this scanner
-   */
-  public int getTimeOut();
-  
-  /**
    * Sets the range of keys to scan over.
    * 
    * @param range

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java?rev=1370914&r1=1370913&r2=1370914&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java Wed
Aug  8 19:47:57 2012
@@ -100,4 +100,19 @@ public interface ScannerBase extends Ite
    * @return an iterator over Key,Value pairs which meet the restrictions set on the scanner
    */
   public Iterator<Entry<Key,Value>> iterator();
+  
+  /**
+   * This setting determines how long a scanner will automatically retry when a failure occurs.
By default a scanner will retry forever.
+   * 
+   * @param timeOut
+   *          in seconds
+   */
+  public void setTimeOut(int timeOut);
+  
+  /**
+   * Returns the setting for how long a scanner will automatically retry when a failure occurs.
+   * 
+   * @return the timeout configured for this scanner
+   */
+  public int getTimeOut();
 }

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java?rev=1370914&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java
(added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java
Wed Aug  8 19:47:57 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * 
+ */
+public class TimedOutException extends RuntimeException {
+  
+  private Set<String> timedoutServers;
+  
+  private static final long serialVersionUID = 1L;
+  
+  private static String shorten(Set<String> set) {
+    if (set.size() < 10) {
+      return set.toString();
+    }
+    
+    return new ArrayList<String>(set).subList(0, 10).toString() + " ... " + (set.size()
- 10) + " servers not shown";
+  }
+
+  public TimedOutException(Set<String> timedoutServers) {
+    super("Servers timed out " + shorten(timedoutServers));
+    this.timedoutServers = timedoutServers;
+    
+  }
+
+  public Set<String> getTimedOutSevers() {
+    return Collections.unmodifiableSet(timedoutServers);
+  }
+}

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java?rev=1370914&r1=1370913&r2=1370914&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
Wed Aug  8 19:47:57 2012
@@ -71,22 +71,6 @@ public class ScannerImpl extends Scanner
     this.timeOut = Integer.MAX_VALUE;
   }
   
-  /**
-   * When failure occurs, the scanner automatically retries. This setting determines how
long a scanner will retry. By default a scanner will retry forever.
-   * 
-   * @param timeOut
-   *          in milliseconds
-   */
-  @Override
-  public synchronized void setTimeOut(int timeOut) {
-    this.timeOut = timeOut;
-  }
-  
-  @Override
-  public synchronized int getTimeOut() {
-    return timeOut;
-  }
-  
   @Override
   public synchronized void setRange(Range range) {
     ArgumentChecker.notNull(range);

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java?rev=1370914&r1=1370913&r2=1370914&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
Wed Aug  8 19:47:57 2012
@@ -44,6 +44,8 @@ public class ScannerOptions implements S
   
   protected SortedSet<Column> fetchedColumns = new TreeSet<Column>();
   
+  protected int timeOut = Integer.MAX_VALUE;
+
   private String regexIterName = null;
   
   protected ScannerOptions() {}
@@ -181,4 +183,18 @@ public class ScannerOptions implements S
   public Iterator<Entry<Key,Value>> iterator() {
     throw new UnsupportedOperationException();
   }
+  
+  @Override
+  public void setTimeOut(int timeOut) {
+    if (timeOut <= 0) {
+      throw new IllegalArgumentException("TimeOut must be positive : " + timeOut);
+    }
+
+    this.timeOut = timeOut;
+  }
+  
+  @Override
+  public int getTimeOut() {
+    return timeOut;
+  }
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java?rev=1370914&r1=1370913&r2=1370914&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
Wed Aug  8 19:47:57 2012
@@ -107,6 +107,6 @@ public class TabletServerBatchReader ext
       throw new IllegalStateException("batch reader closed");
     }
     
-    return new TabletServerBatchReaderIterator(instance, credentials, table, authorizations,
ranges, numThreads, queryThreadPool, this);
+    return new TabletServerBatchReaderIterator(instance, credentials, table, authorizations,
ranges, numThreads, queryThreadPool, this, timeOut * 1000l);
   }
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1370914&r1=1370913&r2=1370914&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
Wed Aug  8 19:47:57 2012
@@ -28,6 +28,7 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
@@ -42,6 +43,7 @@ import org.apache.accumulo.core.client.I
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Key;
@@ -94,6 +96,10 @@ public class TabletServerBatchReaderIter
   
   private volatile Throwable fatalException = null;
   
+  private Map<String,TimeoutTracker> timeoutTrackers;
+  private Set<String> timedoutServers;
+  private long timeout;
+
   public interface ResultReceiver {
     void receive(List<Entry<Key,Value>> entries);
   }
@@ -126,7 +132,7 @@ public class TabletServerBatchReaderIter
   }
   
   public TabletServerBatchReaderIterator(Instance instance, AuthInfo credentials, String
table, Authorizations authorizations, ArrayList<Range> ranges,
-      int numThreads, ExecutorService queryThreadPool, ScannerOptions scannerOptions) {
+      int numThreads, ExecutorService queryThreadPool, ScannerOptions scannerOptions, long
timeout) {
     
     this.instance = instance;
     this.credentials = credentials;
@@ -137,6 +143,10 @@ public class TabletServerBatchReaderIter
     this.options = new ScannerOptions(scannerOptions);
     resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads);
     
+    timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchReaderIterator.TimeoutTracker>());
+    timedoutServers = Collections.synchronizedSet(new HashSet<String>());
+    this.timeout = timeout;
+
     if (options.fetchedColumns.size() > 0) {
       ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size());
       for (Range range : ranges) {
@@ -341,7 +351,13 @@ public class TabletServerBatchReaderIter
       Map<KeyExtent,List<Range>> unscanned = new HashMap<KeyExtent,List<Range>>();
       Map<KeyExtent,List<Range>> tsFailures = new HashMap<KeyExtent,List<Range>>();
       try {
-        doLookup(tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, credentials,
options, authorizations, instance.getConfiguration());
+        TimeoutTracker timeoutTracker = timeoutTrackers.get(tsLocation);
+        if (timeoutTracker == null) {
+          timeoutTracker = new TimeoutTracker(tsLocation, timedoutServers, timeout);
+          timeoutTrackers.put(tsLocation, timeoutTracker);
+        }
+        doLookup(tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, credentials,
options, authorizations, instance.getConfiguration(),
+            timeoutTracker);
         if (tsFailures.size() > 0) {
           TabletLocator tabletLocator = TabletLocator.getInstance(instance, credentials,
new Text(table));
           tabletLocator.invalidateCache(tsFailures.keySet());
@@ -425,6 +441,12 @@ public class TabletServerBatchReaderIter
   }
   
   private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
final ResultReceiver receiver, List<Column> columns) {
+    
+    if (timedoutServers.containsAll(binnedRanges.keySet())) {
+      // all servers have timed out
+      throw new TimedOutException(timedoutServers);
+    }
+
     // when there are lots of threads and a few tablet servers
     // it is good to break request to tablet servers up, the
     // following code determines if this is the case
@@ -444,6 +466,17 @@ public class TabletServerBatchReaderIter
     
     Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
     
+    if (timedoutServers.size() > 0) {
+      // go ahead and fail any timed out servers
+      for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator
= binnedRanges.entrySet().iterator(); iterator.hasNext();) {
+        Entry<String,Map<KeyExtent,List<Range>>> entry = iterator.next();
+        if (timedoutServers.contains(entry.getKey())) {
+          failures.putAll(entry.getValue());
+          iterator.remove();
+        }
+      }
+    }
+
     // randomize tabletserver order... this will help when there are multiple
     // batch readers and writers running against accumulo
     List<String> locations = new ArrayList<String>(binnedRanges.keySet());
@@ -516,9 +549,65 @@ public class TabletServerBatchReaderIter
     }
   }
   
+  private static class TimeoutTracker {
+    
+    String server;
+    Set<String> badServers;
+    long timeOut;
+    long activityTime;
+    Long firstErrorTime = null;
+    
+    TimeoutTracker(String server, Set<String> badServers, long timeOut) {
+      this(timeOut);
+      this.server = server;
+      this.badServers = badServers;
+    }
+
+    TimeoutTracker(long timeOut) {
+      this.timeOut = timeOut;
+    }
+
+    void startingScan() {
+      activityTime = System.currentTimeMillis();
+    }
+    
+    void check() throws IOException {
+      if (System.currentTimeMillis() - activityTime > timeOut) {
+        badServers.add(server);
+        throw new IOException("Time exceeded " + (System.currentTimeMillis() - activityTime)
+ " " + server);
+      }
+    }
+    
+    void madeProgress() {
+      activityTime = System.currentTimeMillis();
+      firstErrorTime = null;
+    }
+    
+    void errorOccured(Exception e) {
+      if (firstErrorTime == null) {
+        firstErrorTime = activityTime;
+      } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
+        badServers.add(server);
+      }
+    }
+    
+    /**
+     * @return
+     */
+    public long getTimeOut() {
+      return timeOut;
+    }
+  }
+
   static void doLookup(String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>>
failures, Map<KeyExtent,List<Range>> unscanned,
       ResultReceiver receiver, List<Column> columns, AuthInfo credentials, ScannerOptions
options, Authorizations authorizations, AccumuloConfiguration conf)
       throws IOException, AccumuloSecurityException, AccumuloServerException {
+    doLookup(server, requested, failures, unscanned, receiver, columns, credentials, options,
authorizations, conf, new TimeoutTracker(Long.MAX_VALUE));
+  }
+  
+  static void doLookup(String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>>
failures, Map<KeyExtent,List<Range>> unscanned,
+      ResultReceiver receiver, List<Column> columns, AuthInfo credentials, ScannerOptions
options, Authorizations authorizations, AccumuloConfiguration conf,
+      TimeoutTracker timeoutTracker) throws IOException, AccumuloSecurityException, AccumuloServerException
{
     
     if (requested.size() == 0) {
       return;
@@ -533,10 +622,19 @@ public class TabletServerBatchReaderIter
       unscanned.put(new KeyExtent(entry.getKey()), ranges);
     }
     
+    timeoutTracker.startingScan();
     TTransport transport = null;
     try {
-      TabletClientService.Client client = ThriftUtil.getTServerClient(server, conf);
+      TabletClientService.Client client;
+      if (timeoutTracker.getTimeOut() < Integer.MAX_VALUE * 1000l)
+        client = ThriftUtil.getTServerClient(server, conf, timeoutTracker.getTimeOut());
+      else
+        client = ThriftUtil.getTServerClient(server, conf);
+
       try {
+        
+
+
         OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Starting multi scan, tserver="
+ server + "  #tablets=" + requested.size() + "  #ranges="
             + sumSizes(requested.values()) + " ssil=" + options.serverSideIteratorList +
" ssio=" + options.serverSideIteratorOptions);
         
@@ -563,10 +661,15 @@ public class TabletServerBatchReaderIter
         if (entries.size() > 0)
           receiver.receive(entries);
 
+        if (entries.size() > 0 || scanResult.fullScans.size() > 0)
+          timeoutTracker.madeProgress();
+
         trackScanning(failures, unscanned, scanResult);
         
         while (scanResult.more) {
           
+          timeoutTracker.check();
+
           opTimer.start("Continuing multi scan, scanid=" + imsr.scanID);
           scanResult = client.continueMultiScan(Tracer.traceInfo(), imsr.scanID);
           opTimer.stop("Got more multi scan results, #results=" + scanResult.results.size()
+ (scanResult.more ? "  scanID=" + imsr.scanID : "")
@@ -579,6 +682,10 @@ public class TabletServerBatchReaderIter
           
           if (entries.size() > 0)
             receiver.receive(entries);
+          
+          if (entries.size() > 0 || scanResult.fullScans.size() > 0)
+            timeoutTracker.madeProgress();
+
           trackScanning(failures, unscanned, scanResult);
         }
         
@@ -589,6 +696,7 @@ public class TabletServerBatchReaderIter
       }
     } catch (TTransportException e) {
       log.debug("Server : " + server + " msg : " + e.getMessage());
+      timeoutTracker.errorOccured(e);
       throw new IOException(e);
     } catch (ThriftSecurityException e) {
       log.debug("Server : " + server + " msg : " + e.getMessage(), e);
@@ -598,6 +706,7 @@ public class TabletServerBatchReaderIter
       throw new AccumuloServerException(server, e);
     } catch (TException e) {
       log.debug("Server : " + server + " msg : " + e.getMessage(), e);
+      timeoutTracker.errorOccured(e);
       throw new IOException(e);
     } catch (NoSuchScanIDException e) {
       log.debug("Server : " + server + " msg : " + e.getMessage(), e);

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java?rev=1370914&r1=1370913&r2=1370914&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java
Wed Aug  8 19:47:57 2012
@@ -92,4 +92,11 @@ public class MockBatchScanner extends Mo
   @Override
   public void close() {}
   
+  @Override
+  public void setTimeOut(int timeout) {}
+  
+  @Override
+  public int getTimeOut() {
+    return Integer.MAX_VALUE;
+  }
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java?rev=1370914&r1=1370913&r2=1370914&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java Wed Aug
 8 19:47:57 2012
@@ -97,8 +97,13 @@ public class ThriftUtil {
   
   static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, Property property, Property timeoutProperty,
       AccumuloConfiguration configuration) throws TTransportException {
+    return getClient(factory, address, property, configuration.getTimeInMillis(timeoutProperty),
configuration);
+  }
+  
+  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, Property property, long timeout,
+      AccumuloConfiguration configuration) throws TTransportException {
     int port = configuration.getPort(property);
-    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port,
configuration.getTimeInMillis(timeoutProperty));
+    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port,
timeout);
     return createClient(factory, transport);
   }
   
@@ -112,6 +117,10 @@ public class ThriftUtil {
     return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT,
Property.GENERAL_RPC_TIMEOUT, conf);
   }
   
+  static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration
conf, long timeout) throws TTransportException {
+    return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT,
timeout, conf);
+  }
+
   public static void execute(String address, AccumuloConfiguration conf, ClientExec<TabletClientService.Client>
exec) throws AccumuloException,
       AccumuloSecurityException {
     while (true) {

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java?rev=1370914&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java
(added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java
Wed Aug  8 19:47:57 2012
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.test.functional;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.UtilWaitThread;
+
+/**
+ * 
+ */
+public class TimeoutTest extends FunctionalTest {
+  
+  @Override
+  public Map<String,String> getInitialConfig() {
+    return Collections.emptyMap();
+  }
+  
+  @Override
+  public List<TableSetup> getTablesToCreate() {
+    return Collections.emptyList();
+  }
+  
+  @Override
+  public void run() throws Exception {
+
+    getConnector().tableOperations().create("timeout");
+    
+    BatchWriter bw = getConnector().createBatchWriter("timeout", 1000000, 60000, 1);
+    
+    Mutation m = new Mutation("r1");
+    m.put("cf1", "cq1", "v1");
+    m.put("cf1", "cq2", "v2");
+    m.put("cf1", "cq3", "v3");
+    m.put("cf1", "cq4", "v4");
+    
+    bw.addMutation(m);
+    
+    bw.close();
+    
+    BatchScanner bs = getConnector().createBatchScanner("timeout", Constants.NO_AUTHS, 2);
+    bs.setTimeOut(1);
+    bs.setRanges(Collections.singletonList(new Range()));
+    
+    // should not timeout
+    for (Entry<Key,Value> entry : bs) {
+      entry.getKey();
+    }
+    
+    IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class);
+    iterSetting.addOption("sleepTime", 2000 + "");
+    getConnector().tableOperations().attachIterator("timeout", iterSetting);
+    UtilWaitThread.sleep(250);
+
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        entry.getKey();
+      }
+      throw new Exception("Did not time out");
+    } catch (TimedOutException toe) {
+      // toe.printStackTrace();
+    }
+
+    bs.close();
+  }
+  
+  @Override
+  public void cleanup() throws Exception {
+    
+  }
+  
+}

Added: accumulo/trunk/test/system/auto/simple/timeout.py
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/system/auto/simple/timeout.py?rev=1370914&view=auto
==============================================================================
--- accumulo/trunk/test/system/auto/simple/timeout.py (added)
+++ accumulo/trunk/test/system/auto/simple/timeout.py Wed Aug  8 19:47:57 2012
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from JavaTest import JavaTest
+
+import unittest
+
+class TimeoutTest(JavaTest):
+    "Test time out"
+
+    order = 91
+    testClass="org.apache.accumulo.server.test.functional.TimeoutTest"
+
+def suite():
+    result = unittest.TestSuite()
+    result.addTest(TimeoutTest())
+    return result



Mime
View raw message