Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A71DBDB16 for ; Wed, 8 Aug 2012 19:48:44 +0000 (UTC) Received: (qmail 19746 invoked by uid 500); 8 Aug 2012 19:48:44 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 19700 invoked by uid 500); 8 Aug 2012 19:48:44 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 19689 invoked by uid 99); 8 Aug 2012 19:48:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Aug 2012 19:48:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Aug 2012 19:48:42 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id BC807238890D for ; Wed, 8 Aug 2012 19:47:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1370914 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/client/mock/ core/src/main/java/org/apache/accumulo/core... Date: Wed, 08 Aug 2012 19:47:58 -0000 To: commits@accumulo.apache.org From: kturner@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120808194758.BC807238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kturner Date: Wed Aug 8 19:47:57 2012 New Revision: 1370914 URL: http://svn.apache.org/viewvc?rev=1370914&view=rev Log: ACCUMULO-705 added timeout to batch scanner Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java accumulo/trunk/test/system/auto/simple/timeout.py Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java?rev=1370914&r1=1370913&r2=1370914&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java Wed Aug 8 19:47:57 2012 @@ -44,4 +44,17 @@ public interface BatchScanner extends Sc * Cleans up and finalizes the scanner */ void close(); + + /** + * Sets a timeout threshold for a server to respond. The batch scanner will accomplish as much work as possible before throwing an exception. BatchScanner + * iterators will throw a {@link TimedOutException} when all needed servers timeout. + * + *

+ * If not set, the timeout defaults to MAX_INT + * + * @param timeout + * in seconds + */ + @Override + void setTimeOut(int timeout); } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java?rev=1370914&r1=1370913&r2=1370914&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java Wed Aug 8 19:47:57 2012 @@ -28,21 +28,6 @@ import org.apache.accumulo.core.data.Ran public interface Scanner extends ScannerBase { /** - * This setting determines how long a scanner will automatically retry when a failure occurs. By default a scanner will retry forever. - * - * @param timeOut - * in seconds - */ - public void setTimeOut(int timeOut); - - /** - * Returns the setting for how long a scanner will automatically retry when a failure occurs. - * - * @return the timeout configured for this scanner - */ - public int getTimeOut(); - - /** * Sets the range of keys to scan over. * * @param range Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java?rev=1370914&r1=1370913&r2=1370914&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java Wed Aug 8 19:47:57 2012 @@ -100,4 +100,19 @@ public interface ScannerBase extends Ite * @return an iterator over Key,Value pairs which meet the restrictions set on the scanner */ public Iterator> iterator(); + + /** + * This setting determines how long a scanner will automatically retry when a failure occurs. By default a scanner will retry forever. + * + * @param timeOut + * in seconds + */ + public void setTimeOut(int timeOut); + + /** + * Returns the setting for how long a scanner will automatically retry when a failure occurs. + * + * @return the timeout configured for this scanner + */ + public int getTimeOut(); } Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java?rev=1370914&view=auto ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java (added) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java Wed Aug 8 19:47:57 2012 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Set; + +/** + * + */ +public class TimedOutException extends RuntimeException { + + private Set timedoutServers; + + private static final long serialVersionUID = 1L; + + private static String shorten(Set set) { + if (set.size() < 10) { + return set.toString(); + } + + return new ArrayList(set).subList(0, 10).toString() + " ... " + (set.size() - 10) + " servers not shown"; + } + + public TimedOutException(Set timedoutServers) { + super("Servers timed out " + shorten(timedoutServers)); + this.timedoutServers = timedoutServers; + + } + + public Set getTimedOutSevers() { + return Collections.unmodifiableSet(timedoutServers); + } +} Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java?rev=1370914&r1=1370913&r2=1370914&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java Wed Aug 8 19:47:57 2012 @@ -71,22 +71,6 @@ public class ScannerImpl extends Scanner this.timeOut = Integer.MAX_VALUE; } - /** - * When failure occurs, the scanner automatically retries. This setting determines how long a scanner will retry. By default a scanner will retry forever. - * - * @param timeOut - * in milliseconds - */ - @Override - public synchronized void setTimeOut(int timeOut) { - this.timeOut = timeOut; - } - - @Override - public synchronized int getTimeOut() { - return timeOut; - } - @Override public synchronized void setRange(Range range) { ArgumentChecker.notNull(range); Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java?rev=1370914&r1=1370913&r2=1370914&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java Wed Aug 8 19:47:57 2012 @@ -44,6 +44,8 @@ public class ScannerOptions implements S protected SortedSet fetchedColumns = new TreeSet(); + protected int timeOut = Integer.MAX_VALUE; + private String regexIterName = null; protected ScannerOptions() {} @@ -181,4 +183,18 @@ public class ScannerOptions implements S public Iterator> iterator() { throw new UnsupportedOperationException(); } + + @Override + public void setTimeOut(int timeOut) { + if (timeOut <= 0) { + throw new IllegalArgumentException("TimeOut must be positive : " + timeOut); + } + + this.timeOut = timeOut; + } + + @Override + public int getTimeOut() { + return timeOut; + } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java?rev=1370914&r1=1370913&r2=1370914&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java Wed Aug 8 19:47:57 2012 @@ -107,6 +107,6 @@ public class TabletServerBatchReader ext throw new IllegalStateException("batch reader closed"); } - return new TabletServerBatchReaderIterator(instance, credentials, table, authorizations, ranges, numThreads, queryThreadPool, this); + return new TabletServerBatchReaderIterator(instance, credentials, table, authorizations, ranges, numThreads, queryThreadPool, this, timeOut * 1000l); } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1370914&r1=1370913&r2=1370914&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java Wed Aug 8 19:47:57 2012 @@ -28,6 +28,7 @@ import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; @@ -42,6 +43,7 @@ import org.apache.accumulo.core.client.I import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; @@ -94,6 +96,10 @@ public class TabletServerBatchReaderIter private volatile Throwable fatalException = null; + private Map timeoutTrackers; + private Set timedoutServers; + private long timeout; + public interface ResultReceiver { void receive(List> entries); } @@ -126,7 +132,7 @@ public class TabletServerBatchReaderIter } public TabletServerBatchReaderIterator(Instance instance, AuthInfo credentials, String table, Authorizations authorizations, ArrayList ranges, - int numThreads, ExecutorService queryThreadPool, ScannerOptions scannerOptions) { + int numThreads, ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout) { this.instance = instance; this.credentials = credentials; @@ -137,6 +143,10 @@ public class TabletServerBatchReaderIter this.options = new ScannerOptions(scannerOptions); resultsQueue = new ArrayBlockingQueue>>(numThreads); + timeoutTrackers = Collections.synchronizedMap(new HashMap()); + timedoutServers = Collections.synchronizedSet(new HashSet()); + this.timeout = timeout; + if (options.fetchedColumns.size() > 0) { ArrayList ranges2 = new ArrayList(ranges.size()); for (Range range : ranges) { @@ -341,7 +351,13 @@ public class TabletServerBatchReaderIter Map> unscanned = new HashMap>(); Map> tsFailures = new HashMap>(); try { - doLookup(tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, credentials, options, authorizations, instance.getConfiguration()); + TimeoutTracker timeoutTracker = timeoutTrackers.get(tsLocation); + if (timeoutTracker == null) { + timeoutTracker = new TimeoutTracker(tsLocation, timedoutServers, timeout); + timeoutTrackers.put(tsLocation, timeoutTracker); + } + doLookup(tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, credentials, options, authorizations, instance.getConfiguration(), + timeoutTracker); if (tsFailures.size() > 0) { TabletLocator tabletLocator = TabletLocator.getInstance(instance, credentials, new Text(table)); tabletLocator.invalidateCache(tsFailures.keySet()); @@ -425,6 +441,12 @@ public class TabletServerBatchReaderIter } private void doLookups(Map>> binnedRanges, final ResultReceiver receiver, List columns) { + + if (timedoutServers.containsAll(binnedRanges.keySet())) { + // all servers have timed out + throw new TimedOutException(timedoutServers); + } + // when there are lots of threads and a few tablet servers // it is good to break request to tablet servers up, the // following code determines if this is the case @@ -444,6 +466,17 @@ public class TabletServerBatchReaderIter Map> failures = new HashMap>(); + if (timedoutServers.size() > 0) { + // go ahead and fail any timed out servers + for (Iterator>>> iterator = binnedRanges.entrySet().iterator(); iterator.hasNext();) { + Entry>> entry = iterator.next(); + if (timedoutServers.contains(entry.getKey())) { + failures.putAll(entry.getValue()); + iterator.remove(); + } + } + } + // randomize tabletserver order... this will help when there are multiple // batch readers and writers running against accumulo List locations = new ArrayList(binnedRanges.keySet()); @@ -516,9 +549,65 @@ public class TabletServerBatchReaderIter } } + private static class TimeoutTracker { + + String server; + Set badServers; + long timeOut; + long activityTime; + Long firstErrorTime = null; + + TimeoutTracker(String server, Set badServers, long timeOut) { + this(timeOut); + this.server = server; + this.badServers = badServers; + } + + TimeoutTracker(long timeOut) { + this.timeOut = timeOut; + } + + void startingScan() { + activityTime = System.currentTimeMillis(); + } + + void check() throws IOException { + if (System.currentTimeMillis() - activityTime > timeOut) { + badServers.add(server); + throw new IOException("Time exceeded " + (System.currentTimeMillis() - activityTime) + " " + server); + } + } + + void madeProgress() { + activityTime = System.currentTimeMillis(); + firstErrorTime = null; + } + + void errorOccured(Exception e) { + if (firstErrorTime == null) { + firstErrorTime = activityTime; + } else if (System.currentTimeMillis() - firstErrorTime > timeOut) { + badServers.add(server); + } + } + + /** + * @return + */ + public long getTimeOut() { + return timeOut; + } + } + static void doLookup(String server, Map> requested, Map> failures, Map> unscanned, ResultReceiver receiver, List columns, AuthInfo credentials, ScannerOptions options, Authorizations authorizations, AccumuloConfiguration conf) throws IOException, AccumuloSecurityException, AccumuloServerException { + doLookup(server, requested, failures, unscanned, receiver, columns, credentials, options, authorizations, conf, new TimeoutTracker(Long.MAX_VALUE)); + } + + static void doLookup(String server, Map> requested, Map> failures, Map> unscanned, + ResultReceiver receiver, List columns, AuthInfo credentials, ScannerOptions options, Authorizations authorizations, AccumuloConfiguration conf, + TimeoutTracker timeoutTracker) throws IOException, AccumuloSecurityException, AccumuloServerException { if (requested.size() == 0) { return; @@ -533,10 +622,19 @@ public class TabletServerBatchReaderIter unscanned.put(new KeyExtent(entry.getKey()), ranges); } + timeoutTracker.startingScan(); TTransport transport = null; try { - TabletClientService.Client client = ThriftUtil.getTServerClient(server, conf); + TabletClientService.Client client; + if (timeoutTracker.getTimeOut() < Integer.MAX_VALUE * 1000l) + client = ThriftUtil.getTServerClient(server, conf, timeoutTracker.getTimeOut()); + else + client = ThriftUtil.getTServerClient(server, conf); + try { + + + OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Starting multi scan, tserver=" + server + " #tablets=" + requested.size() + " #ranges=" + sumSizes(requested.values()) + " ssil=" + options.serverSideIteratorList + " ssio=" + options.serverSideIteratorOptions); @@ -563,10 +661,15 @@ public class TabletServerBatchReaderIter if (entries.size() > 0) receiver.receive(entries); + if (entries.size() > 0 || scanResult.fullScans.size() > 0) + timeoutTracker.madeProgress(); + trackScanning(failures, unscanned, scanResult); while (scanResult.more) { + timeoutTracker.check(); + opTimer.start("Continuing multi scan, scanid=" + imsr.scanID); scanResult = client.continueMultiScan(Tracer.traceInfo(), imsr.scanID); opTimer.stop("Got more multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? " scanID=" + imsr.scanID : "") @@ -579,6 +682,10 @@ public class TabletServerBatchReaderIter if (entries.size() > 0) receiver.receive(entries); + + if (entries.size() > 0 || scanResult.fullScans.size() > 0) + timeoutTracker.madeProgress(); + trackScanning(failures, unscanned, scanResult); } @@ -589,6 +696,7 @@ public class TabletServerBatchReaderIter } } catch (TTransportException e) { log.debug("Server : " + server + " msg : " + e.getMessage()); + timeoutTracker.errorOccured(e); throw new IOException(e); } catch (ThriftSecurityException e) { log.debug("Server : " + server + " msg : " + e.getMessage(), e); @@ -598,6 +706,7 @@ public class TabletServerBatchReaderIter throw new AccumuloServerException(server, e); } catch (TException e) { log.debug("Server : " + server + " msg : " + e.getMessage(), e); + timeoutTracker.errorOccured(e); throw new IOException(e); } catch (NoSuchScanIDException e) { log.debug("Server : " + server + " msg : " + e.getMessage(), e); Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java?rev=1370914&r1=1370913&r2=1370914&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java Wed Aug 8 19:47:57 2012 @@ -92,4 +92,11 @@ public class MockBatchScanner extends Mo @Override public void close() {} + @Override + public void setTimeOut(int timeout) {} + + @Override + public int getTimeOut() { + return Integer.MAX_VALUE; + } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java?rev=1370914&r1=1370913&r2=1370914&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java Wed Aug 8 19:47:57 2012 @@ -97,8 +97,13 @@ public class ThriftUtil { static public T getClient(TServiceClientFactory factory, String address, Property property, Property timeoutProperty, AccumuloConfiguration configuration) throws TTransportException { + return getClient(factory, address, property, configuration.getTimeInMillis(timeoutProperty), configuration); + } + + static public T getClient(TServiceClientFactory factory, String address, Property property, long timeout, + AccumuloConfiguration configuration) throws TTransportException { int port = configuration.getPort(property); - TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port, configuration.getTimeInMillis(timeoutProperty)); + TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port, timeout); return createClient(factory, transport); } @@ -112,6 +117,10 @@ public class ThriftUtil { return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT, Property.GENERAL_RPC_TIMEOUT, conf); } + static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration conf, long timeout) throws TTransportException { + return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT, timeout, conf); + } + public static void execute(String address, AccumuloConfiguration conf, ClientExec exec) throws AccumuloException, AccumuloSecurityException { while (true) { Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java?rev=1370914&view=auto ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java (added) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java Wed Aug 8 19:47:57 2012 @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.test.functional; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.TimedOutException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.UtilWaitThread; + +/** + * + */ +public class TimeoutTest extends FunctionalTest { + + @Override + public Map getInitialConfig() { + return Collections.emptyMap(); + } + + @Override + public List getTablesToCreate() { + return Collections.emptyList(); + } + + @Override + public void run() throws Exception { + + getConnector().tableOperations().create("timeout"); + + BatchWriter bw = getConnector().createBatchWriter("timeout", 1000000, 60000, 1); + + Mutation m = new Mutation("r1"); + m.put("cf1", "cq1", "v1"); + m.put("cf1", "cq2", "v2"); + m.put("cf1", "cq3", "v3"); + m.put("cf1", "cq4", "v4"); + + bw.addMutation(m); + + bw.close(); + + BatchScanner bs = getConnector().createBatchScanner("timeout", Constants.NO_AUTHS, 2); + bs.setTimeOut(1); + bs.setRanges(Collections.singletonList(new Range())); + + // should not timeout + for (Entry entry : bs) { + entry.getKey(); + } + + IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class); + iterSetting.addOption("sleepTime", 2000 + ""); + getConnector().tableOperations().attachIterator("timeout", iterSetting); + UtilWaitThread.sleep(250); + + try { + for (Entry entry : bs) { + entry.getKey(); + } + throw new Exception("Did not time out"); + } catch (TimedOutException toe) { + // toe.printStackTrace(); + } + + bs.close(); + } + + @Override + public void cleanup() throws Exception { + + } + +} Added: accumulo/trunk/test/system/auto/simple/timeout.py URL: http://svn.apache.org/viewvc/accumulo/trunk/test/system/auto/simple/timeout.py?rev=1370914&view=auto ============================================================================== --- accumulo/trunk/test/system/auto/simple/timeout.py (added) +++ accumulo/trunk/test/system/auto/simple/timeout.py Wed Aug 8 19:47:57 2012 @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from JavaTest import JavaTest + +import unittest + +class TimeoutTest(JavaTest): + "Test time out" + + order = 91 + testClass="org.apache.accumulo.server.test.functional.TimeoutTest" + +def suite(): + result = unittest.TestSuite() + result.addTest(TimeoutTest()) + return result