accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1344410 - in /accumulo/branches/ACCUMULO-578: ./ core/ core/src/main/java/org/apache/accumulo/core/client/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/client/mock/ server/ server/src...
Date Wed, 30 May 2012 18:46:37 GMT
Author: ecn
Date: Wed May 30 18:46:36 2012
New Revision: 1344410

URL: http://svn.apache.org/viewvc?rev=1344410&view=rev
Log:
ACCUMULO-578 merge trunk to sandbox

Modified:
    accumulo/branches/ACCUMULO-578/   (props changed)
    accumulo/branches/ACCUMULO-578/core/   (props changed)
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
    accumulo/branches/ACCUMULO-578/server/   (props changed)
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
    accumulo/branches/ACCUMULO-578/src/   (props changed)

Propchange: accumulo/branches/ACCUMULO-578/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1342421-1343896,1343899-1343942
  Merged /accumulo/trunk:r1343669-1344408
  Merged /accumulo/branches/1.4:r1341135-1342418,1342420-1343942

Propchange: accumulo/branches/ACCUMULO-578/core/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/core:r1343669-1344408
  Merged /accumulo/branches/1.4/core:r1341135-1342418,1342420-1343942
  Merged /accumulo/branches/1.4/src/core:r1342421-1343896,1343899-1343942

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
(original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
Wed May 30 18:46:36 2012
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.client.mock.IteratorAdapter;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -52,7 +53,9 @@ import org.apache.hadoop.io.Text;
  * the source scanner (which will execute server side) and to the client side scanner (which
will execute client side).
  */
 public class ClientSideIteratorScanner extends ScannerOptions implements Scanner {
+  private int size;
   private int timeOut;
+  
   private Range range;
   private boolean isolated = false;
   
@@ -131,9 +134,9 @@ public class ClientSideIteratorScanner e
    */
   public ClientSideIteratorScanner(Scanner scanner) {
     smi = new ScannerTranslator(scanner);
-    setRange(scanner.getRange());
-    setBatchSize(scanner.getBatchSize());
-    setTimeOut(scanner.getTimeOut());
+    this.range = new Range((Key) null, (Key) null);
+    this.size = Constants.SCAN_BATCH_SIZE;
+    this.timeOut = Integer.MAX_VALUE;
   }
   
   /**
@@ -147,7 +150,7 @@ public class ClientSideIteratorScanner e
   
   @Override
   public Iterator<Entry<Key,Value>> iterator() {
-    smi.scanner.setBatchSize(getBatchSize());
+    smi.scanner.setBatchSize(size);
     smi.scanner.setTimeOut(timeOut);
     if (isolated)
       smi.scanner.enableIsolation();
@@ -225,6 +228,16 @@ public class ClientSideIteratorScanner e
   }
   
   @Override
+  public void setBatchSize(int size) {
+    this.size = size;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return size;
+  }
+  
+  @Override
   public void enableIsolation() {
     this.isolated = true;
   }

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
(original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
Wed May 30 18:46:36 2012
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.IsolationException;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.data.ByteSequence;
@@ -209,6 +210,7 @@ public class IsolatedScanner extends Sca
   private Scanner scanner;
   private Range range;
   private int timeOut;
+  private int batchSize;
   private RowBufferFactory bufferFactory;
   
   public IsolatedScanner(Scanner scanner) {
@@ -217,15 +219,15 @@ public class IsolatedScanner extends Sca
   
   public IsolatedScanner(Scanner scanner, RowBufferFactory bufferFactory) {
     this.scanner = scanner;
-    setRange(scanner.getRange());
-    setBatchSize(scanner.getBatchSize());
-    setTimeOut(scanner.getTimeOut());
+    this.range = new Range();
+    this.timeOut = Integer.MAX_VALUE;
+    this.batchSize = Constants.SCAN_BATCH_SIZE;
     this.bufferFactory = bufferFactory;
   }
   
   @Override
   public Iterator<Entry<Key,Value>> iterator() {
-    return new RowBufferingIterator(scanner, this, range, timeOut, getBatchSize(), bufferFactory);
+    return new RowBufferingIterator(scanner, this, range, timeOut, batchSize, bufferFactory);
   }
   
   @Override
@@ -250,6 +252,16 @@ public class IsolatedScanner extends Sca
   }
   
   @Override
+  public void setBatchSize(int size) {
+    this.batchSize = size;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return batchSize;
+  }
+  
+  @Override
   public void enableIsolation() {
     // aye aye captain, already done sir
   }

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/Scanner.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
(original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
Wed May 30 18:46:36 2012
@@ -58,6 +58,21 @@ public interface Scanner extends Scanner
   public Range getRange();
   
   /**
+   * Sets the number of Key/Value pairs that will be fetched at a time from a tablet server.
+   * 
+   * @param size
+   *          the number of Key/Value pairs to fetch per call to Accumulo
+   */
+  public void setBatchSize(int size);
+  
+  /**
+   * Returns the batch size (number of Key/Value pairs) that will be fetched at a time from
a tablet server.
+   * 
+   * @return the batch size configured for this scanner
+   */
+  public int getBatchSize();
+  
+  /**
    * Enables row isolation. Writes that occur to a row after a scan of that row has begun
will not be seen if this option is enabled.
    */
   public void enableIsolation();

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
(original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
Wed May 30 18:46:36 2012
@@ -92,21 +92,6 @@ public interface ScannerBase extends Ite
   public void clearScanIterators();
   
   /**
-   * Sets the number of Key/Value pairs that will be fetched at a time from a tablet server.
-   * 
-   * @param size
-   *          the number of Key/Value pairs to fetch per call to Accumulo
-   */
-  public void setBatchSize(int size);
-  
-  /**
-   * Returns the batch size (number of Key/Value pairs) that will be fetched at a time from
a tablet server.
-   * 
-   * @return the batch size configured for this scanner
-   */
-  public int getBatchSize();
-  
-  /**
    * Returns an iterator over an accumulo table. This iterator uses the options that are
currently set for its lifetime, so setting options will have no effect
    * on existing iterators.
    * 

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
(original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
Wed May 30 18:46:36 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.core.client.impl;
 
+import java.net.UnknownHostException;
 import java.util.List;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -34,7 +35,7 @@ import org.apache.thrift.transport.TTran
 public class MasterClient {
   private static final Logger log = Logger.getLogger(MasterClient.class);
   
-  public static MasterClientService.Iface getConnectionWithRetry(Instance instance) throws
TTransportException {
+  public static MasterClientService.Iface getConnectionWithRetry(Instance instance) {
     ArgumentChecker.notNull(instance);
     
     while (true) {
@@ -64,6 +65,10 @@ public class MasterClient {
           instance.getConfiguration());
       return client;
     } catch (TTransportException tte) {
+      if (tte.getCause().getClass().equals(UnknownHostException.class)) {
+        // do not expect to recover from this
+        throw new RuntimeException(tte);
+      }
       log.debug("Failed to connect to master=" + master + " portHint=" + portHint + ", will
retry... ", tte);
       return null;
     }

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
(original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
Wed May 30 18:46:36 2012
@@ -104,7 +104,7 @@ class OfflineIterator implements Iterato
       return new MultiIterator(allIters, false);
     }
   }
-  
+
   private SortedKeyValueIterator<Key,Value> iter;
   private Range range;
   private KeyExtent currentExtent;
@@ -114,7 +114,7 @@ class OfflineIterator implements Iterato
   private Instance instance;
   private ScannerOptions options;
   private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
-  
+
   /**
    * @param offlineScanner
    * @param instance
@@ -130,7 +130,7 @@ class OfflineIterator implements Iterato
     if (this.options.fetchedColumns.size() > 0) {
       range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
     }
-    
+
     this.tableId = table.toString();
     this.authorizations = authorizations;
     this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
@@ -141,12 +141,12 @@ class OfflineIterator implements Iterato
       
       while (iter != null && !iter.hasTop())
         nextTablet();
-      
+
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
-  
+
   @Override
   public boolean hasNext() {
     return iter != null && iter.hasTop();
@@ -158,7 +158,7 @@ class OfflineIterator implements Iterato
       byte[] v = iter.getTopValue().get();
       // copy just like tablet server does, do this before calling next
       KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length));
-      
+
       iter.next();
       
       while (iter != null && !iter.hasTop())
@@ -195,19 +195,19 @@ class OfflineIterator implements Iterato
         iter = null;
         return;
       }
-      
+
       if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW)))
{
         iter = null;
         return;
       }
-      
+
       nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false);
     }
-    
+
     List<String> relFiles = new ArrayList<String>();
     
     Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
-    
+
     while (eloc.getSecond() != null) {
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         Tables.clearCache(instance);
@@ -226,7 +226,7 @@ class OfflineIterator implements Iterato
     if (!extent.getTableId().toString().equals(tableId)) {
       throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent);
     }
-    
+
     if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
       throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
     
@@ -259,7 +259,7 @@ class OfflineIterator implements Iterato
     while (row.hasNext()) {
       Entry<Key,Value> entry = row.next();
       Key key = entry.getKey();
-      
+
       if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
         relFiles.add(key.getColumnQualifier().toString());
       }
@@ -272,11 +272,11 @@ class OfflineIterator implements Iterato
       if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
         extent = new KeyExtent(key.getRow(), entry.getValue());
       }
-      
+
     }
     return new Pair<KeyExtent,String>(extent, location);
   }
-  
+
   /**
    * @param absFiles
    * @return
@@ -299,7 +299,7 @@ class OfflineIterator implements Iterato
     }
     
     readers.clear();
-    
+
     // TODO need to close files
     for (String file : absFiles) {
       FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf,
acuTableConf, null, null);
@@ -326,7 +326,7 @@ class OfflineIterator implements Iterato
     return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter,
extent, acuTableConf, options.serverSideIteratorList,
         options.serverSideIteratorOptions, iterEnv, false));
   }
-  
+
   @Override
   public void remove() {
     throw new UnsupportedOperationException();
@@ -339,6 +339,7 @@ class OfflineIterator implements Iterato
  */
 public class OfflineScanner extends ScannerOptions implements Scanner {
   
+  private int batchSize;
   private int timeOut;
   private Range range;
   
@@ -353,12 +354,13 @@ public class OfflineScanner extends Scan
     this.credentials = credentials;
     this.tableId = new Text(tableId);
     this.range = new Range((Key) null, (Key) null);
-    
+
     this.authorizations = authorizations;
     
+    this.batchSize = Constants.SCAN_BATCH_SIZE;
     this.timeOut = Integer.MAX_VALUE;
   }
-  
+
   @Override
   public void setTimeOut(int timeOut) {
     this.timeOut = timeOut;
@@ -380,6 +382,16 @@ public class OfflineScanner extends Scan
   }
   
   @Override
+  public void setBatchSize(int size) {
+    this.batchSize = size;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return batchSize;
+  }
+  
+  @Override
   public void enableIsolation() {
     
   }
@@ -393,5 +405,5 @@ public class OfflineScanner extends Scan
   public Iterator<Entry<Key,Value>> iterator() {
     return new OfflineIterator(this, instance, credentials, authorizations, tableId, range);
   }
-  
+
 }

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
(original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
Wed May 30 18:46:36 2012
@@ -30,6 +30,7 @@ package org.apache.accumulo.core.client.
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
@@ -52,6 +53,7 @@ public class ScannerImpl extends Scanner
   private Authorizations authorizations;
   private Text table;
   
+  private int size;
   private int timeOut;
   
   private Range range;
@@ -65,6 +67,7 @@ public class ScannerImpl extends Scanner
     this.range = new Range((Key) null, (Key) null);
     this.authorizations = authorizations;
     
+    this.size = Constants.SCAN_BATCH_SIZE;
     this.timeOut = Integer.MAX_VALUE;
   }
   
@@ -95,12 +98,25 @@ public class ScannerImpl extends Scanner
     return range;
   }
   
+  @Override
+  public synchronized void setBatchSize(int size) {
+    if (size > 0)
+      this.size = size;
+    else
+      throw new IllegalArgumentException("size must be greater than zero");
+  }
+  
+  @Override
+  public synchronized int getBatchSize() {
+    return size;
+  }
+  
   /**
    * Returns an iterator over an accumulo table. This iterator uses the options that are
currently set on the scanner for its lifetime. So setting options on a
    * Scanner object will have no effect on existing iterators.
    */
   public synchronized Iterator<Entry<Key,Value>> iterator() {
-    return new ScannerIterator(instance, credentials, table, authorizations, range, getBatchSize(),
timeOut, this, isolated);
+    return new ScannerIterator(instance, credentials, table, authorizations, range, size,
timeOut, this, isolated);
   }
   
   @Override

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
(original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
Wed May 30 18:46:36 2012
@@ -27,7 +27,6 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.data.Column;
@@ -47,8 +46,6 @@ public class ScannerOptions implements S
   
   private String regexIterName = null;
   
-  private int size = Constants.SCAN_BATCH_SIZE;
-  
   protected ScannerOptions() {}
   
   public ScannerOptions(ScannerOptions so) {
@@ -102,7 +99,7 @@ public class ScannerOptions implements S
     
     serverSideIteratorOptions.remove(iteratorName);
   }
-  
+    
   /**
    * Override any existing options on the given named iterator
    */
@@ -181,19 +178,6 @@ public class ScannerOptions implements S
   }
   
   @Override
-  public synchronized void setBatchSize(int size) {
-    if (size > 0)
-      this.size = size;
-    else
-      throw new IllegalArgumentException("size must be greater than zero");
-  }
-  
-  @Override
-  public synchronized int getBatchSize() {
-    return size;
-  }
-  
-  @Override
   public Iterator<Entry<Key,Value>> iterator() {
     throw new UnsupportedOperationException();
   }

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
(original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
Wed May 30 18:46:36 2012
@@ -69,6 +69,7 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
+
 public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>>
{
   
   private static final Logger log = Logger.getLogger(TabletServerBatchReaderIterator.class);
@@ -81,7 +82,7 @@ public class TabletServerBatchReaderIter
   private final ExecutorService queryThreadPool;
   private final ScannerOptions options;
   
-  private ArrayBlockingQueue<Entry<Key,Value>> resultsQueue;
+  private ArrayBlockingQueue<Entry<Key,Value>> resultsQueue = new ArrayBlockingQueue<Entry<Key,Value>>(1000);
   private Entry<Key,Value> nextEntry = null;
   private Object nextLock = new Object();
   
@@ -130,7 +131,6 @@ public class TabletServerBatchReaderIter
     this.numThreads = numThreads;
     this.queryThreadPool = queryThreadPool;
     this.options = new ScannerOptions(scannerOptions);
-    this.resultsQueue = new ArrayBlockingQueue<Entry<Key,Value>>(this.options.getBatchSize());
     
     if (options.fetchedColumns.size() > 0) {
       ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size());
@@ -178,7 +178,7 @@ public class TabletServerBatchReaderIter
       
       // don't have one cached, try to cache one and return success
       try {
-        while (nextEntry == null && fatalException == null)
+        while (nextEntry == null && fatalException == null && !queryThreadPool.isShutdown())
           nextEntry = resultsQueue.poll(1, TimeUnit.SECONDS);
         
         if (fatalException != null)
@@ -187,6 +187,9 @@ public class TabletServerBatchReaderIter
           else
             throw new RuntimeException(fatalException);
         
+        if (queryThreadPool.isShutdown())
+          throw new RuntimeException("scanner closed");
+
         return nextEntry.getKey() != null && nextEntry.getValue() != null;
       } catch (InterruptedException e) {
         throw new RuntimeException(e);

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
(original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
Wed May 30 18:46:36 2012
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.security
 public class MockScanner extends MockScannerBase implements Scanner {
   
   int timeOut = 0;
+  int batchSize = 0;
   Range range = new Range();
   
   MockScanner(MockTable table, Authorizations auths) {
@@ -59,6 +60,16 @@ public class MockScanner extends MockSca
   }
   
   @Override
+  public void setBatchSize(int size) {
+    this.batchSize = size;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return this.batchSize;
+  }
+  
+  @Override
   public void enableIsolation() {}
   
   @Override

Propchange: accumulo/branches/ACCUMULO-578/server/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/server:r1341135-1342418,1342420-1343942
  Merged /accumulo/branches/1.4/src/server:r1342421-1343896,1343899-1343942
  Merged /accumulo/trunk/server:r1343669-1344408

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java
(original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java
Wed May 30 18:46:36 2012
@@ -2132,7 +2132,8 @@ public class Master implements LiveTServ
     };
     long current = System.currentTimeMillis();
     final long waitTime = getSystemConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
-    final String masterClientAddress = hostname + ":" + getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT);
+    final String masterClientAddress = org.apache.accumulo.core.util.AddressUtil.toString(new
InetSocketAddress(hostname, getSystemConfiguration().getPort(
+        Property.MASTER_CLIENTPORT)));
     
     boolean locked = false;
     while (System.currentTimeMillis() - current < waitTime) {

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
(original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
Wed May 30 18:46:36 2012
@@ -306,35 +306,88 @@ public class Monitor {
         if (mmi == null)
           UtilWaitThread.sleep(1000);
       }
-      
-      int majorCompactions = 0;
-      int minorCompactions = 0;
-      
-      lookupRateTracker.startingUpdates();
-      indexCacheHitTracker.startingUpdates();
-      indexCacheRequestTracker.startingUpdates();
-      dataCacheHitTracker.startingUpdates();
-      dataCacheRequestTracker.startingUpdates();
-      
-      for (TabletServerStatus server : mmi.tServerInfo) {
-        TableInfo summary = Monitor.summarizeTableStats(server);
-        totalIngestRate += summary.ingestRate;
-        totalIngestByteRate += summary.ingestByteRate;
-        totalQueryRate += summary.queryRate;
-        totalScanRate += summary.scanRate;
-        totalQueryByteRate += summary.queryByteRate;
-        totalEntries += summary.recs;
-        totalHoldTime += server.holdTime;
-        totalLookups += server.lookups;
-        majorCompactions += summary.major.running;
-        minorCompactions += summary.minor.running;
-        lookupRateTracker.updateTabletServer(server.name, server.lastContact, server.lookups);
-        indexCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheHits);
-        indexCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheRequest);
-        dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits);
-        dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest);
+      if (mmi != null) {
+        
+        int majorCompactions = 0;
+        int minorCompactions = 0;
+        
+        lookupRateTracker.startingUpdates();
+        indexCacheHitTracker.startingUpdates();
+        indexCacheRequestTracker.startingUpdates();
+        dataCacheHitTracker.startingUpdates();
+        dataCacheRequestTracker.startingUpdates();
+
+        for (TabletServerStatus server : mmi.tServerInfo) {
+          TableInfo summary = Monitor.summarizeTableStats(server);
+          totalIngestRate += summary.ingestRate;
+          totalIngestByteRate += summary.ingestByteRate;
+          totalQueryRate += summary.queryRate;
+          totalScanRate += summary.scanRate;
+          totalQueryByteRate += summary.queryByteRate;
+          totalEntries += summary.recs;
+          totalHoldTime += server.holdTime;
+          totalLookups += server.lookups;
+          majorCompactions += summary.major.running;
+          minorCompactions += summary.minor.running;
+          lookupRateTracker.updateTabletServer(server.name, server.lastContact, server.lookups);
+          indexCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheHits);
+          indexCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheRequest);
+          dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits);
+          dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest);
+        }
+        
+        lookupRateTracker.finishedUpdating();
+        indexCacheHitTracker.finishedUpdating();
+        indexCacheRequestTracker.finishedUpdating();
+        dataCacheHitTracker.finishedUpdating();
+        dataCacheRequestTracker.finishedUpdating();
+        
+        int totalTables = 0;
+        for (TableInfo tInfo : mmi.tableMap.values()) {
+          totalTabletCount += tInfo.tablets;
+          onlineTabletCount += tInfo.onlineTablets;
+          totalTables++;
+        }
+        Monitor.totalIngestRate = totalIngestRate;
+        Monitor.totalTables = totalTables;
+        totalIngestByteRate = totalIngestByteRate / 1000000.0;
+        Monitor.totalIngestByteRate = totalIngestByteRate;
+        Monitor.totalQueryRate = totalQueryRate;
+        Monitor.totalScanRate = totalScanRate;
+        totalQueryByteRate = totalQueryByteRate / 1000000.0;
+        Monitor.totalQueryByteRate = totalQueryByteRate;
+        Monitor.totalEntries = totalEntries;
+        Monitor.totalTabletCount = totalTabletCount;
+        Monitor.onlineTabletCount = onlineTabletCount;
+        Monitor.totalHoldTime = totalHoldTime;
+        Monitor.totalLookups = totalLookups;
+        
+        ingestRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestRate));
+        ingestByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestByteRate));
+        recoveriesOverTime.add(new Pair<Long,Integer>(currentTime, mmi.recovery.size()));
+        
+        double totalLoad = 0.;
+        for (TabletServerStatus status : mmi.tServerInfo) {
+          if (status != null)
+            totalLoad += status.osLoad;
+        }
+        loadOverTime.add(new Pair<Long,Double>(currentTime, totalLoad));
+        
+        minorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, minorCompactions));
+        majorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, majorCompactions));
+        
+        lookupsOverTime.add(new Pair<Long,Double>(currentTime, lookupRateTracker.calculateRate()));
+        
+        queryRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalQueryRate));
+        queryByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalQueryByteRate));
+        
+        scanRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalScanRate));
+        
+        calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker);
+        calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker);
       }
       
+<<<<<<< .working
       lookupRateTracker.finishedUpdating();
       indexCacheHitTracker.finishedUpdating();
       indexCacheRequestTracker.finishedUpdating();

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
(original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
Wed May 30 18:46:36 2012
@@ -79,7 +79,7 @@ public class Compactor implements Callab
   private CompactionEnv env;
   private Configuration conf;
   private FileSystem fs;
-  private KeyExtent extent;
+  protected KeyExtent extent;
   private List<IteratorSetting> iterators;
   
   Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap
imm, String outputFile, boolean propogateDeletes,

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
(original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
Wed May 30 18:46:36 2012
@@ -22,10 +22,13 @@ import java.util.Map;
 import java.util.Random;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
@@ -64,6 +67,15 @@ public class MinorCompactor extends Comp
     });
   }
   
+  private boolean isTableDeleting() {
+    try {
+      return Tables.getTableState(HdfsZooInstance.getInstance(), extent.getTableId().toString())
== TableState.DELETING;
+    } catch (Exception e) {
+      log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ",
e);
+      return false; // can not get positive confirmation that its deleting.
+    }
+  }
+
   @Override
   public CompactionStats call() {
     log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent());
@@ -75,7 +87,6 @@ public class MinorCompactor extends Comp
     boolean reportedProblem = false;
     
     do {
-      
       try {
         CompactionStats ret = super.call();
         
@@ -117,6 +128,9 @@ public class MinorCompactor extends Comp
         log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());
       }
       
+      if (isTableDeleting())
+        return new CompactionStats(0, 0);
+
     } while (true);
   }
   

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java?rev=1344410&r1=1344409&r2=1344410&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
(original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
Wed May 30 18:46:36 2012
@@ -95,6 +95,26 @@ public class BadIteratorMincTest extends
     
     if (count != 1)
       throw new Exception("Did not see expected # entries " + count);
+    
+    // now try putting bad iterator back and deleting the table
+    getConnector().tableOperations().setProperty("foo", Property.TABLE_ITERATOR_PREFIX.getKey()
+ "minc.badi", "30," + BadIterator.class.getName());
+    bw = getConnector().createBatchWriter("foo", 1000000, 60000l, 2);
+    m = new Mutation(new Text("r2"));
+    m.put(new Text("acf"), new Text("foo"), new Value("1".getBytes()));
+    bw.addMutation(m);
+    bw.close();
+    
+    // make sure property is given time to propagate
+    UtilWaitThread.sleep(1000);
+    
+    getConnector().tableOperations().flush("foo", null, null, false);
+    
+    // make sure the flush has time to start
+    UtilWaitThread.sleep(1000);
+    
+    // this should not hang
+    getConnector().tableOperations().delete("foo");
+
   }
   
 }

Propchange: accumulo/branches/ACCUMULO-578/src/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src:r1343669-1344408
  Merged /accumulo/branches/1.4/src:r1341135-1342418,1342420-1343942
  Merged /accumulo/branches/1.4:r1342421-1343896
  Merged /accumulo/trunk:r1342452
  Merged /accumulo/branches/1.4/src/src:r1342421-1343896,1343899-1343942



Mime
View raw message