accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch master updated: fixes #813 close server side session when scanner closed (#905)
Date Thu, 17 Jan 2019 23:11:14 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bc5d32  fixes #813 close server side session when scanner closed (#905)
7bc5d32 is described below

commit 7bc5d326342ad5673e330e66f513b8cf3b5531e0
Author: Keith Turner <kturner@apache.org>
AuthorDate: Thu Jan 17 18:11:10 2019 -0500

    fixes #813 close server side session when scanner closed (#905)
---
 .../core/client/ClientSideIteratorScanner.java     |  5 ++
 .../accumulo/core/client/IsolatedScanner.java      |  5 ++
 .../accumulo/core/clientImpl/ScannerImpl.java      | 78 ++++++++++++++++++++-
 .../accumulo/core/clientImpl/ScannerIterator.java  | 33 ++++++++-
 .../accumulo/core/clientImpl/ThriftScanner.java    | 20 ++++++
 .../org/apache/accumulo/test/CloseScannerIT.java   | 81 ++++++++++++++++++++++
 6 files changed, 217 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
index e37f4fe..789277b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
@@ -382,4 +382,9 @@ public class ClientSideIteratorScanner extends ScannerOptions implements
Scanner
   public SamplerConfiguration getIteratorSamplerConfiguration() {
     return iteratorSamplerConfig;
   }
+
+  @Override
+  public void close() {
+    smi.scanner.close();
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
index c7e09ef..a98d91b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
@@ -283,4 +283,9 @@ public class IsolatedScanner extends ScannerOptions implements Scanner
{
 
     this.readaheadThreshold = batches;
   }
+
+  @Override
+  public void close() {
+    scanner.close();
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
index 3d38bb3..4b865f3 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
@@ -19,6 +19,8 @@ package org.apache.accumulo.core.clientImpl;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
@@ -55,6 +57,54 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
   private boolean isolated = false;
   private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
 
+  boolean closed = false;
+
+  private static final int MAX_ENTRIES = 16;
+
+  private long iterCount = 0;
+
+  // Create an LRU map of iterators that tracks the MAX_ENTRIES most recently used iterators.
An LRU
+  // map is used to support the use case of a long lived scanner that constantly creates
iterators
+  // and does not read all of the data. For this case do not want iterator tracking to consume
too
+  // much memory. Also it would be best to avoid an RPC storm of close methods for thousands
+  // sessions that may have timed out.
+  private Map<ScannerIterator,Long> iters = new LinkedHashMap<ScannerIterator,Long>(MAX_ENTRIES
+ 1,
+      .75F, true) {
+    private static final long serialVersionUID = 1L;
+
+    // This method is called just after a new entry has been added
+    @Override
+    public boolean removeEldestEntry(Map.Entry<ScannerIterator,Long> eldest) {
+      return size() > MAX_ENTRIES;
+    }
+  };
+
+  /**
+   * This is used for ScannerIterators to report their activity back to the scanner that
created
+   * them.
+   */
+  class Reporter {
+
+    void readBatch(ScannerIterator iter) {
+      synchronized (ScannerImpl.this) {
+        // This iter just had some activity, so access it in map so it becomes the most recently
+        // used.
+        iters.get(iter);
+      }
+    }
+
+    void finished(ScannerIterator iter) {
+      synchronized (ScannerImpl.this) {
+        iters.remove(iter);
+      }
+    }
+  }
+
+  private synchronized void ensureOpen() {
+    if (closed)
+      throw new IllegalArgumentException("Scanner is closed");
+  }
+
   public ScannerImpl(ClientContext context, Table.ID tableId, Authorizations authorizations)
{
     checkArgument(context != null, "context is null");
     checkArgument(tableId != null, "tableId is null");
@@ -69,17 +119,20 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
 
   @Override
   public synchronized void setRange(Range range) {
+    ensureOpen();
     checkArgument(range != null, "range is null");
     this.range = range;
   }
 
   @Override
   public synchronized Range getRange() {
+    ensureOpen();
     return range;
   }
 
   @Override
   public synchronized void setBatchSize(int size) {
+    ensureOpen();
     if (size > 0)
       this.size = size;
     else
@@ -88,32 +141,42 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
 
   @Override
   public synchronized int getBatchSize() {
+    ensureOpen();
     return size;
   }
 
   @Override
   public synchronized Iterator<Entry<Key,Value>> iterator() {
-    return new ScannerIterator(context, tableId, authorizations, range, size,
-        getTimeout(TimeUnit.SECONDS), this, isolated, readaheadThreshold);
+    ensureOpen();
+    ScannerIterator iter = new ScannerIterator(context, tableId, authorizations, range, size,
+        getTimeout(TimeUnit.SECONDS), this, isolated, readaheadThreshold, new Reporter());
+
+    iters.put(iter, iterCount++);
+
+    return iter;
   }
 
   @Override
   public Authorizations getAuthorizations() {
+    ensureOpen();
     return authorizations;
   }
 
   @Override
   public synchronized void enableIsolation() {
+    ensureOpen();
     this.isolated = true;
   }
 
   @Override
   public synchronized void disableIsolation() {
+    ensureOpen();
     this.isolated = false;
   }
 
   @Override
   public synchronized void setReadaheadThreshold(long batches) {
+    ensureOpen();
     if (batches < 0) {
       throw new IllegalArgumentException(
           "Number of batches before read-ahead must be non-negative");
@@ -124,6 +187,17 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
 
   @Override
   public synchronized long getReadaheadThreshold() {
+    ensureOpen();
     return readaheadThreshold;
   }
+
+  @Override
+  public synchronized void close() {
+    if (!closed) {
+      iters.forEach((iter, v) -> iter.close());
+      iters.clear();
+    }
+
+    closed = true;
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
index 3dafb75..de65065 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
@@ -47,7 +47,7 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
 
   // scanner state
   private Iterator<KeyValue> iter;
-  private ScanState scanState;
+  private final ScanState scanState;
 
   private ScannerOptions options;
 
@@ -58,18 +58,24 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
   private long batchCount = 0;
   private long readaheadThreshold;
 
+  private ScannerImpl.Reporter reporter;
+
   private static ThreadPoolExecutor readaheadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3L,
       TimeUnit.SECONDS, new SynchronousQueue<>(),
       new NamingThreadFactory("Accumulo scanner read ahead thread"));
 
+  private boolean closed = false;
+
   ScannerIterator(ClientContext context, Table.ID tableId, Authorizations authorizations,
       Range range, int size, long timeOut, ScannerOptions options, boolean isolated,
-      long readaheadThreshold) {
+      long readaheadThreshold, ScannerImpl.Reporter reporter) {
     this.timeOut = timeOut;
     this.readaheadThreshold = readaheadThreshold;
 
     this.options = new ScannerOptions(options);
 
+    this.reporter = reporter;
+
     if (this.options.fetchedColumns.size() > 0) {
       range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
     }
@@ -99,6 +105,7 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
     iter = getNextBatch().iterator();
     if (!iter.hasNext()) {
       finished = true;
+      reporter.finished(this);
       return false;
     }
 
@@ -112,18 +119,38 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
     throw new NoSuchElementException();
   }
 
+  void close() {
+    // run actual close operation in the background so this does not block.
+    readaheadPool.execute(() -> {
+      synchronized (scanState) {
+        // this is synchronized so its mutually exclusive with readBatch()
+        closed = true;
+        ThriftScanner.close(scanState);
+      }
+    });
+  }
+
   private void initiateReadAhead() {
     Preconditions.checkState(readAheadOperation == null);
     readAheadOperation = readaheadPool.submit(this::readBatch);
   }
 
   private List<KeyValue> readBatch() throws Exception {
+
     List<KeyValue> batch;
 
     do {
-      batch = ThriftScanner.scan(scanState.context, scanState, timeOut);
+      synchronized (scanState) {
+        // this is synchronized so its mutually exclusive with closing
+        Preconditions.checkState(!closed, "Scanner was closed");
+        batch = ThriftScanner.scan(scanState.context, scanState, timeOut);
+      }
     } while (batch != null && batch.size() == 0);
 
+    if (batch != null) {
+      reporter.readBatch(this);
+    }
+
     return batch == null ? Collections.emptyList() : batch;
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 61af620..e7ccd7f 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -557,4 +557,24 @@ public class ThriftScanner {
       Thread.currentThread().setName(old);
     }
   }
+
+  static void close(ScanState scanState) {
+    if (!scanState.finished && scanState.scanID != null && scanState.prevLoc
!= null) {
+      TInfo tinfo = Tracer.traceInfo();
+
+      log.debug("Closing active scan {} {}", scanState.prevLoc, scanState.scanID);
+      HostAndPort parsedLocation = HostAndPort.fromString(scanState.prevLoc.tablet_location);
+      TabletClientService.Client client = null;
+      try {
+        client = ThriftUtil.getTServerClient(parsedLocation, scanState.context);
+        client.closeScan(tinfo, scanState.scanID);
+      } catch (TException e) {
+        // ignore this is a best effort
+        log.debug("Failed to close active scan " + scanState.prevLoc + " " + scanState.scanID,
e);
+      } finally {
+        if (client != null)
+          ThriftUtil.returnClient(client);
+      }
+    }
+  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
new file mode 100644
index 0000000..4594230
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class CloseScannerIT extends AccumuloClusterHarness {
+
+  static final int ROWS = 1000;
+  static final int COLS = 1000;
+
+  @Test
+  public void testManyScans() throws Exception {
+
+    try (AccumuloClient client = createAccumuloClient()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), ROWS, COLS, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      for (int i = 0; i < 200; i++) {
+        try (Scanner scanner = createScanner(client, tableName, i)) {
+          scanner.setRange(new Range());
+          scanner.setReadaheadThreshold(i % 2 == 0 ? 0 : 3);
+
+          for (int j = 0; j < i % 7 + 1; j++) {
+            // only read a little data and quit, this should leave a session open on the
tserver
+            Iterators.get(scanner.iterator(), 10);
+          }
+        } // when the scanner is closed, all open sessions should be closed
+      }
+
+      List<String> tservers = client.instanceOperations().getTabletServers();
+      int activeScans = 0;
+      for (String tserver : tservers) {
+        activeScans += client.instanceOperations().getActiveScans(tserver).size();
+      }
+
+      assertTrue(activeScans < 3);
+    }
+  }
+
+  private static Scanner createScanner(AccumuloClient client, String tableName, int i)
+      throws Exception {
+    Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);
+    if (i % 2 == 0)
+      scanner = new IsolatedScanner(scanner);
+    return scanner;
+  }
+}


Mime
View raw message