From commits-return-22515-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Fri Jan 18 00:11:17 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 73A1B18063F for ; Fri, 18 Jan 2019 00:11:16 +0100 (CET) Received: (qmail 83585 invoked by uid 500); 17 Jan 2019 23:11:15 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 83576 invoked by uid 99); 17 Jan 2019 23:11:15 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jan 2019 23:11:15 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id F2D438760E; Thu, 17 Jan 2019 23:11:14 +0000 (UTC) Date: Thu, 17 Jan 2019 23:11:14 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch master updated: fixes #813 close server side session when scanner closed (#905) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154776667457.27818.14185848236091431619@gitbox.apache.org> From: kturner@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 6481ccca5486f5aa8c475fb2a9479bea96034368 X-Git-Newrev: 7bc5d326342ad5673e330e66f513b8cf3b5531e0 X-Git-Rev: 7bc5d326342ad5673e330e66f513b8cf3b5531e0 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new 7bc5d32 fixes #813 close server side session when scanner closed (#905) 7bc5d32 is described below commit 7bc5d326342ad5673e330e66f513b8cf3b5531e0 Author: Keith Turner AuthorDate: Thu Jan 17 18:11:10 2019 -0500 fixes #813 close server side session when scanner closed (#905) --- .../core/client/ClientSideIteratorScanner.java | 5 ++ .../accumulo/core/client/IsolatedScanner.java | 5 ++ .../accumulo/core/clientImpl/ScannerImpl.java | 78 ++++++++++++++++++++- .../accumulo/core/clientImpl/ScannerIterator.java | 33 ++++++++- .../accumulo/core/clientImpl/ThriftScanner.java | 20 ++++++ .../org/apache/accumulo/test/CloseScannerIT.java | 81 ++++++++++++++++++++++ 6 files changed, 217 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java index e37f4fe..789277b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java @@ -382,4 +382,9 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner public SamplerConfiguration getIteratorSamplerConfiguration() { return iteratorSamplerConfig; } + + @Override + public void close() { + smi.scanner.close(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java index c7e09ef..a98d91b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java @@ -283,4 +283,9 @@ public class IsolatedScanner extends ScannerOptions implements Scanner { this.readaheadThreshold = batches; } + + @Override + public void close() { + scanner.close(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java index 3d38bb3..4b865f3 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java @@ -19,6 +19,8 @@ package org.apache.accumulo.core.clientImpl; import static com.google.common.base.Preconditions.checkArgument; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; @@ -55,6 +57,54 @@ public class ScannerImpl extends ScannerOptions implements Scanner { private boolean isolated = false; private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD; + boolean closed = false; + + private static final int MAX_ENTRIES = 16; + + private long iterCount = 0; + + // Create an LRU map of iterators that tracks the MAX_ENTRIES most recently used iterators. An LRU + // map is used to support the use case of a long lived scanner that constantly creates iterators + // and does not read all of the data. For this case do not want iterator tracking to consume too + // much memory. Also it would be best to avoid an RPC storm of close methods for thousands + // sessions that may have timed out. + private Map iters = new LinkedHashMap(MAX_ENTRIES + 1, + .75F, true) { + private static final long serialVersionUID = 1L; + + // This method is called just after a new entry has been added + @Override + public boolean removeEldestEntry(Map.Entry eldest) { + return size() > MAX_ENTRIES; + } + }; + + /** + * This is used for ScannerIterators to report their activity back to the scanner that created + * them. + */ + class Reporter { + + void readBatch(ScannerIterator iter) { + synchronized (ScannerImpl.this) { + // This iter just had some activity, so access it in map so it becomes the most recently + // used. + iters.get(iter); + } + } + + void finished(ScannerIterator iter) { + synchronized (ScannerImpl.this) { + iters.remove(iter); + } + } + } + + private synchronized void ensureOpen() { + if (closed) + throw new IllegalArgumentException("Scanner is closed"); + } + public ScannerImpl(ClientContext context, Table.ID tableId, Authorizations authorizations) { checkArgument(context != null, "context is null"); checkArgument(tableId != null, "tableId is null"); @@ -69,17 +119,20 @@ public class ScannerImpl extends ScannerOptions implements Scanner { @Override public synchronized void setRange(Range range) { + ensureOpen(); checkArgument(range != null, "range is null"); this.range = range; } @Override public synchronized Range getRange() { + ensureOpen(); return range; } @Override public synchronized void setBatchSize(int size) { + ensureOpen(); if (size > 0) this.size = size; else @@ -88,32 +141,42 @@ public class ScannerImpl extends ScannerOptions implements Scanner { @Override public synchronized int getBatchSize() { + ensureOpen(); return size; } @Override public synchronized Iterator> iterator() { - return new ScannerIterator(context, tableId, authorizations, range, size, - getTimeout(TimeUnit.SECONDS), this, isolated, readaheadThreshold); + ensureOpen(); + ScannerIterator iter = new ScannerIterator(context, tableId, authorizations, range, size, + getTimeout(TimeUnit.SECONDS), this, isolated, readaheadThreshold, new Reporter()); + + iters.put(iter, iterCount++); + + return iter; } @Override public Authorizations getAuthorizations() { + ensureOpen(); return authorizations; } @Override public synchronized void enableIsolation() { + ensureOpen(); this.isolated = true; } @Override public synchronized void disableIsolation() { + ensureOpen(); this.isolated = false; } @Override public synchronized void setReadaheadThreshold(long batches) { + ensureOpen(); if (batches < 0) { throw new IllegalArgumentException( "Number of batches before read-ahead must be non-negative"); @@ -124,6 +187,17 @@ public class ScannerImpl extends ScannerOptions implements Scanner { @Override public synchronized long getReadaheadThreshold() { + ensureOpen(); return readaheadThreshold; } + + @Override + public synchronized void close() { + if (!closed) { + iters.forEach((iter, v) -> iter.close()); + iters.clear(); + } + + closed = true; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java index 3dafb75..de65065 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java @@ -47,7 +47,7 @@ public class ScannerIterator implements Iterator> { // scanner state private Iterator iter; - private ScanState scanState; + private final ScanState scanState; private ScannerOptions options; @@ -58,18 +58,24 @@ public class ScannerIterator implements Iterator> { private long batchCount = 0; private long readaheadThreshold; + private ScannerImpl.Reporter reporter; + private static ThreadPoolExecutor readaheadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamingThreadFactory("Accumulo scanner read ahead thread")); + private boolean closed = false; + ScannerIterator(ClientContext context, Table.ID tableId, Authorizations authorizations, Range range, int size, long timeOut, ScannerOptions options, boolean isolated, - long readaheadThreshold) { + long readaheadThreshold, ScannerImpl.Reporter reporter) { this.timeOut = timeOut; this.readaheadThreshold = readaheadThreshold; this.options = new ScannerOptions(options); + this.reporter = reporter; + if (this.options.fetchedColumns.size() > 0) { range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last()); } @@ -99,6 +105,7 @@ public class ScannerIterator implements Iterator> { iter = getNextBatch().iterator(); if (!iter.hasNext()) { finished = true; + reporter.finished(this); return false; } @@ -112,18 +119,38 @@ public class ScannerIterator implements Iterator> { throw new NoSuchElementException(); } + void close() { + // run actual close operation in the background so this does not block. + readaheadPool.execute(() -> { + synchronized (scanState) { + // this is synchronized so its mutually exclusive with readBatch() + closed = true; + ThriftScanner.close(scanState); + } + }); + } + private void initiateReadAhead() { Preconditions.checkState(readAheadOperation == null); readAheadOperation = readaheadPool.submit(this::readBatch); } private List readBatch() throws Exception { + List batch; do { - batch = ThriftScanner.scan(scanState.context, scanState, timeOut); + synchronized (scanState) { + // this is synchronized so its mutually exclusive with closing + Preconditions.checkState(!closed, "Scanner was closed"); + batch = ThriftScanner.scan(scanState.context, scanState, timeOut); + } } while (batch != null && batch.size() == 0); + if (batch != null) { + reporter.readBatch(this); + } + return batch == null ? Collections.emptyList() : batch; } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index 61af620..e7ccd7f 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@ -557,4 +557,24 @@ public class ThriftScanner { Thread.currentThread().setName(old); } } + + static void close(ScanState scanState) { + if (!scanState.finished && scanState.scanID != null && scanState.prevLoc != null) { + TInfo tinfo = Tracer.traceInfo(); + + log.debug("Closing active scan {} {}", scanState.prevLoc, scanState.scanID); + HostAndPort parsedLocation = HostAndPort.fromString(scanState.prevLoc.tablet_location); + TabletClientService.Client client = null; + try { + client = ThriftUtil.getTServerClient(parsedLocation, scanState.context); + client.closeScan(tinfo, scanState.scanID); + } catch (TException e) { + // ignore this is a best effort + log.debug("Failed to close active scan " + scanState.prevLoc + " " + scanState.scanID, e); + } finally { + if (client != null) + ThriftUtil.returnClient(client); + } + } + } } diff --git a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java new file mode 100644 index 0000000..4594230 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test; + +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.functional.ReadWriteIT; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +public class CloseScannerIT extends AccumuloClusterHarness { + + static final int ROWS = 1000; + static final int COLS = 1000; + + @Test + public void testManyScans() throws Exception { + + try (AccumuloClient client = createAccumuloClient()) { + String tableName = getUniqueNames(1)[0]; + + client.tableOperations().create(tableName); + + ReadWriteIT.ingest(client, getClientInfo(), ROWS, COLS, 50, 0, tableName); + + client.tableOperations().flush(tableName, null, null, true); + + for (int i = 0; i < 200; i++) { + try (Scanner scanner = createScanner(client, tableName, i)) { + scanner.setRange(new Range()); + scanner.setReadaheadThreshold(i % 2 == 0 ? 0 : 3); + + for (int j = 0; j < i % 7 + 1; j++) { + // only read a little data and quit, this should leave a session open on the tserver + Iterators.get(scanner.iterator(), 10); + } + } // when the scanner is closed, all open sessions should be closed + } + + List tservers = client.instanceOperations().getTabletServers(); + int activeScans = 0; + for (String tserver : tservers) { + activeScans += client.instanceOperations().getActiveScans(tserver).size(); + } + + assertTrue(activeScans < 3); + } + } + + private static Scanner createScanner(AccumuloClient client, String tableName, int i) + throws Exception { + Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY); + if (i % 2 == 0) + scanner = new IsolatedScanner(scanner); + return scanner; + } +}