accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch master updated: fixes #736 avoid dropping exceptions in scanner (#889)
Date Wed, 16 Jan 2019 17:48:55 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new e66d103  fixes #736 avoid dropping exceptions in scanner (#889)
e66d103 is described below

commit e66d1031ba4b32e270d3230fe49fa9e839663c50
Author: Keith Turner <kturner@apache.org>
AuthorDate: Wed Jan 16 12:48:50 2019 -0500

    fixes #736 avoid dropping exceptions in scanner (#889)
---
 .../core/client/TableDeletedException.java         |   8 +
 .../core/client/TableOfflineException.java         |  10 ++
 .../core/clientImpl/IsolationException.java        |  10 ++
 .../accumulo/core/clientImpl/ScannerIterator.java  | 162 +++++++++------------
 4 files changed, 97 insertions(+), 93 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/TableDeletedException.java
b/core/src/main/java/org/apache/accumulo/core/client/TableDeletedException.java
index 644f752..8b1aef5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/TableDeletedException.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/TableDeletedException.java
@@ -32,6 +32,14 @@ public class TableDeletedException extends RuntimeException {
     this.tableId = tableId;
   }
 
+  /**
+   * @since 2.0.0
+   */
+  public TableDeletedException(String tableId, Exception cause) {
+    super("Table ID " + tableId + " was deleted", cause);
+    this.tableId = tableId;
+  }
+
   public String getTableId() {
     return tableId;
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/client/TableOfflineException.java
b/core/src/main/java/org/apache/accumulo/core/client/TableOfflineException.java
index 951274e..7fc4721 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/TableOfflineException.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/TableOfflineException.java
@@ -28,7 +28,17 @@ public class TableOfflineException extends RuntimeException {
     super("Table with ID (" + tableId + ") is offline");
   }
 
+  /**
+   * @since 2.0.0
+   */
   public TableOfflineException(String msg) {
     super(msg);
   }
+
+  /**
+   * @since 2.0.0
+   */
+  public TableOfflineException(Exception cause) {
+    super(cause);
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/IsolationException.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/IsolationException.java
index 0dfa904..c5b07a9 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/IsolationException.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/IsolationException.java
@@ -20,4 +20,14 @@ public class IsolationException extends RuntimeException {
 
   private static final long serialVersionUID = 1L;
 
+  /**
+   * @since 2.0.0
+   */
+  public IsolationException(Exception cause) {
+    super(cause);
+  }
+
+  public IsolationException() {
+    super();
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
index 011aa50..3dafb75 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
@@ -21,31 +21,26 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.SampleNotPresentException;
 import org.apache.accumulo.core.client.TableDeletedException;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.clientImpl.ThriftScanner.ScanState;
-import org.apache.accumulo.core.clientImpl.ThriftScanner.ScanTimedOutException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyValue;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class ScannerIterator implements Iterator<Entry<Key,Value>> {
+import com.google.common.base.Preconditions;
 
-  private static final Logger log = LoggerFactory.getLogger(ScannerIterator.class);
+public class ScannerIterator implements Iterator<Entry<Key,Value>> {
 
   // scanner options
   private long timeOut;
@@ -56,56 +51,17 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
 
   private ScannerOptions options;
 
-  private ArrayBlockingQueue<Object> synchQ;
+  private Future<List<KeyValue>> readAheadOperation;
 
   private boolean finished = false;
 
-  private boolean readaheadInProgress = false;
   private long batchCount = 0;
   private long readaheadThreshold;
 
-  private static final List<KeyValue> EMPTY_LIST = Collections.emptyList();
-
   private static ThreadPoolExecutor readaheadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3L,
       TimeUnit.SECONDS, new SynchronousQueue<>(),
       new NamingThreadFactory("Accumulo scanner read ahead thread"));
 
-  private class Reader implements Runnable {
-
-    @Override
-    public void run() {
-
-      try {
-        while (true) {
-          List<KeyValue> currentBatch = ThriftScanner.scan(scanState.context, scanState,
timeOut);
-
-          if (currentBatch == null) {
-            synchQ.add(EMPTY_LIST);
-            return;
-          }
-
-          if (currentBatch.size() == 0)
-            continue;
-
-          synchQ.add(currentBatch);
-          return;
-        }
-      } catch (IsolationException | ScanTimedOutException | AccumuloException
-          | AccumuloSecurityException | TableDeletedException | TableOfflineException
-          | SampleNotPresentException e) {
-        log.trace("{}", e.getMessage(), e);
-        synchQ.add(e);
-      } catch (TableNotFoundException e) {
-        log.warn("{}", e.getMessage(), e);
-        synchQ.add(e);
-      } catch (Exception e) {
-        log.error("{}", e.getMessage(), e);
-        synchQ.add(e);
-      }
-    }
-
-  }
-
   ScannerIterator(ClientContext context, Table.ID tableId, Authorizations authorizations,
       Range range, int size, long timeOut, ScannerOptions options, boolean isolated,
       long readaheadThreshold) {
@@ -114,8 +70,6 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
 
     this.options = new ScannerOptions(options);
 
-    synchQ = new ArrayBlockingQueue<>(1);
-
     if (this.options.fetchedColumns.size() > 0) {
       range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
     }
@@ -133,13 +87,7 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
     iter = null;
   }
 
-  private void initiateReadAhead() {
-    readaheadInProgress = true;
-    readaheadPool.execute(new Reader());
-  }
-
   @Override
-  @SuppressWarnings("unchecked")
   public boolean hasNext() {
     if (finished)
       return false;
@@ -148,58 +96,86 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
       return true;
     }
 
-    // this is done in order to find see if there is another batch to get
+    iter = getNextBatch().iterator();
+    if (!iter.hasNext()) {
+      finished = true;
+      return false;
+    }
 
-    try {
-      if (!readaheadInProgress) {
-        // no read ahead run, fetch the next batch right now
-        new Reader().run();
-      }
+    return true;
+  }
+
+  @Override
+  public Entry<Key,Value> next() {
+    if (hasNext())
+      return iter.next();
+    throw new NoSuchElementException();
+  }
+
+  private void initiateReadAhead() {
+    Preconditions.checkState(readAheadOperation == null);
+    readAheadOperation = readaheadPool.submit(this::readBatch);
+  }
 
-      Object obj = synchQ.take();
+  private List<KeyValue> readBatch() throws Exception {
+    List<KeyValue> batch;
 
-      if (obj instanceof Exception) {
-        finished = true;
-        if (obj instanceof RuntimeException)
-          throw (RuntimeException) obj;
-        else
-          throw new RuntimeException((Exception) obj);
-      }
+    do {
+      batch = ThriftScanner.scan(scanState.context, scanState, timeOut);
+    } while (batch != null && batch.size() == 0);
 
-      List<KeyValue> currentBatch = (List<KeyValue>) obj;
+    return batch == null ? Collections.emptyList() : batch;
+  }
+
+  private List<KeyValue> getNextBatch() {
+
+    List<KeyValue> nextBatch;
 
-      if (currentBatch.size() == 0) {
-        currentBatch = null;
-        finished = true;
-        return false;
+    try {
+      if (readAheadOperation == null) {
+        // no read ahead run, fetch the next batch right now
+        nextBatch = readBatch();
+      } else {
+        nextBatch = readAheadOperation.get();
+        readAheadOperation = null;
       }
-      iter = currentBatch.iterator();
+    } catch (ExecutionException ee) {
+      wrapExecutionException(ee);
+      throw new RuntimeException(ee);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    if (!nextBatch.isEmpty()) {
       batchCount++;
 
       if (batchCount > readaheadThreshold) {
         // start a thread to read the next batch
         initiateReadAhead();
       }
-
-    } catch (InterruptedException e1) {
-      throw new RuntimeException(e1);
     }
 
-    return true;
-  }
-
-  @Override
-  public Entry<Key,Value> next() {
-    if (hasNext())
-      return iter.next();
-    throw new NoSuchElementException();
+    return nextBatch;
   }
 
-  // just here to satisfy the interface
-  // could make this actually delete things from the database
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException("remove is not supported in Scanner");
+  private void wrapExecutionException(ExecutionException ee) {
+    // Need preserve the type of exception that was the cause because some code depends on
it.
+    // However the cause is an exception that occurred in a background thread, so throwing
it would
+    // lose the stack trace for the user thread calling the scanner. Wrapping the exception
with the
+    // same type preserves the type and stack traces (foreground and background thread traces)
that
+    // are critical for debugging.
+    if (ee.getCause() instanceof IsolationException)
+      throw new IsolationException(ee);
+    if (ee.getCause() instanceof TableDeletedException) {
+      TableDeletedException cause = (TableDeletedException) ee.getCause();
+      throw new TableDeletedException(cause.getTableId(), cause);
+    }
+    if (ee.getCause() instanceof TableOfflineException)
+      throw new TableOfflineException(ee);
+    if (ee.getCause() instanceof SampleNotPresentException)
+      throw new SampleNotPresentException(ee.getCause().getMessage(), ee);
   }
 
 }


Mime
View raw message