Merge branch '1.6' into 1.7
Conflicts:
server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/65628282
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/65628282
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/65628282
Branch: refs/heads/1.7
Commit: 656282825ad0eb4ee51052e71492a3d3fd5c1f02
Parents: 642add8 46ad836
Author: Josh Elser <elserj@apache.org>
Authored: Tue Jan 12 00:24:24 2016 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Tue Jan 12 00:24:24 2016 -0500
----------------------------------------------------------------------
.../org/apache/accumulo/core/conf/Property.java | 6 +-
.../tserver/session/ConditionalSession.java | 3 +-
.../tserver/session/MultiScanSession.java | 4 +-
.../accumulo/tserver/session/ScanSession.java | 6 +-
.../accumulo/tserver/session/Session.java | 4 +-
.../tserver/session/SessionManager.java | 61 ++++++-
.../apache/accumulo/tserver/tablet/Scanner.java | 68 +++++--
.../test/functional/ScanSessionTimeOutIT.java | 15 +-
.../test/functional/SessionBlockVerifyIT.java | 176 +++++++++++++++++++
9 files changed, 306 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
index cd5e617,0000000..138f558
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
@@@ -1,44 -1,0 +1,45 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+
+public class ConditionalSession extends Session {
+ public final TCredentials credentials;
+ public final Authorizations auths;
+ public final String tableId;
+ public final AtomicBoolean interruptFlag = new AtomicBoolean();
+ public final Durability durability;
+
+ public ConditionalSession(TCredentials credentials, Authorizations authorizations, String
tableId, Durability durability) {
+ super(credentials);
+ this.credentials = credentials;
+ this.auths = authorizations;
+ this.tableId = tableId;
+ this.durability = durability;
+ }
+
+ @Override
- public void cleanup() {
++ public boolean cleanup() {
+ interruptFlag.set(true);
++ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index b326e10,0000000..2fd590c
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@@ -1,63 -1,0 +1,65 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.tserver.scan.ScanTask;
+
+public class MultiScanSession extends Session {
+ public final KeyExtent threadPoolExtent;
+ public final HashSet<Column> columnSet = new HashSet<Column>();
+ public final Map<KeyExtent,List<Range>> queries;
+ public final List<IterInfo> ssiList;
+ public final Map<String,Map<String,String>> ssio;
+ public final Authorizations auths;
+
+ // stats
+ public int numRanges;
+ public int numTablets;
+ public int numEntries;
+ public long totalLookupTime;
+
+ public volatile ScanTask<MultiScanResult> lookupTask;
+
+ public MultiScanSession(TCredentials credentials, KeyExtent threadPoolExtent, Map<KeyExtent,List<Range>>
queries, List<IterInfo> ssiList,
+ Map<String,Map<String,String>> ssio, Authorizations authorizations) {
+ super(credentials);
+ this.queries = queries;
+ this.ssiList = ssiList;
+ this.ssio = ssio;
+ this.auths = authorizations;
+ this.threadPoolExtent = threadPoolExtent;
+ }
+
+ @Override
- public void cleanup() {
++ public boolean cleanup() {
+ if (lookupTask != null)
+ lookupTask.cancel(true);
++ // the cancellation should provide us the safety to return true here
++ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index d5b0027,0000000..743f4d3
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@@ -1,70 -1,0 +1,72 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.tserver.scan.ScanTask;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+import org.apache.accumulo.tserver.tablet.Scanner;
+
+public class ScanSession extends Session {
+ public final Stat nbTimes = new Stat();
+ public final KeyExtent extent;
+ public final Set<Column> columnSet;
+ public final List<IterInfo> ssiList;
+ public final Map<String,Map<String,String>> ssio;
+ public final Authorizations auths;
+ public final AtomicBoolean interruptFlag = new AtomicBoolean();
+ public long entriesReturned = 0;
+ public long batchCount = 0;
+ public volatile ScanTask<ScanBatch> nextBatchTask;
+ public Scanner scanner;
+ public final long readaheadThreshold;
+
+ public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+ Authorizations authorizations, long readaheadThreshold) {
+ super(credentials);
+ this.extent = extent;
+ this.columnSet = columnSet;
+ this.ssiList = ssiList;
+ this.ssio = ssio;
+ this.auths = authorizations;
+ this.readaheadThreshold = readaheadThreshold;
+ }
+
+ @Override
- public void cleanup() {
++ public boolean cleanup() {
+ try {
+ if (nextBatchTask != null)
+ nextBatchTask.cancel(true);
+ } finally {
+ if (scanner != null)
- scanner.close();
++ return scanner.close();
++ else
++ return true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index a561166,0000000..1d2d88d
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@@ -1,43 -1,0 +1,45 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.server.rpc.TServerUtils;
+
+public class Session {
+ public final String client;
+ long lastAccessTime;
+ public long startTime;
+ boolean reserved;
+ private final TCredentials credentials;
+
+ Session(TCredentials credentials) {
+ this.credentials = credentials;
+ this.client = TServerUtils.clientAddress.get();
+ }
+
+ public String getUser() {
+ return credentials.getPrincipal();
+ }
+
+ public TCredentials getCredentials() {
+ return credentials;
+ }
+
- public void cleanup() {}
++ public boolean cleanup() {
++ return true;
++ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 83d87b3,0000000..1cd7fa6
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@@ -1,319 -1,0 +1,362 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
++import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
++import java.util.Set;
+import java.util.TimerTask;
+
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ScanState;
+import org.apache.accumulo.core.tabletserver.thrift.ScanType;
+import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.tserver.scan.ScanRunState;
+import org.apache.accumulo.tserver.scan.ScanTask;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
++import com.google.common.collect.Iterables;
++import com.google.common.collect.Maps;
++
+public class SessionManager {
+ private static final Logger log = LoggerFactory.getLogger(SessionManager.class);
+
+ private final SecureRandom random = new SecureRandom();
+ private final Map<Long,Session> sessions = new HashMap<Long,Session>();
+ private final long maxIdle;
++ private final long maxUpdateIdle;
++ private final List<Session> idleSessions = new ArrayList<Session>();
++ private final Long expiredSessionMarker = new Long(-1);
+ private final AccumuloConfiguration aconf;
+
+ public SessionManager(AccumuloConfiguration conf) {
+ aconf = conf;
++ maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
+ maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
- sweep(maxIdle);
++ sweep(maxIdle, maxUpdateIdle);
+ }
+ };
+
+ SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
+ }
+
+ public synchronized long createSession(Session session, boolean reserve) {
+ long sid = random.nextLong();
+
+ while (sessions.containsKey(sid)) {
+ sid = random.nextLong();
+ }
+
+ sessions.put(sid, session);
+
+ session.reserved = reserve;
+
+ session.startTime = session.lastAccessTime = System.currentTimeMillis();
+
+ return sid;
+ }
+
+ public long getMaxIdleTime() {
+ return maxIdle;
+ }
+
+ /**
+ * while a session is reserved, it cannot be canceled or removed
+ */
+
+ public synchronized Session reserveSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
+ public synchronized Session reserveSession(long sessionId, boolean wait) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ while (wait && session.reserved) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
+ public synchronized void unreserveSession(Session session) {
+ if (!session.reserved)
+ throw new IllegalStateException();
+ notifyAll();
+ session.reserved = false;
+ session.lastAccessTime = System.currentTimeMillis();
+ }
+
+ public synchronized void unreserveSession(long sessionId) {
+ Session session = getSession(sessionId);
+ if (session != null)
+ unreserveSession(session);
+ }
+
+ public synchronized Session getSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null)
+ session.lastAccessTime = System.currentTimeMillis();
+ return session;
+ }
+
+ public Session removeSession(long sessionId) {
+ return removeSession(sessionId, false);
+ }
+
+ public Session removeSession(long sessionId, boolean unreserve) {
+ Session session = null;
+ synchronized (this) {
+ session = sessions.remove(sessionId);
+ if (unreserve && session != null)
+ unreserveSession(session);
+ }
+
+ // do clean up out side of lock..
+ if (session != null)
+ session.cleanup();
+
+ return session;
+ }
+
- private void sweep(long maxIdle) {
++ private void sweep(final long maxIdle, final long maxUpdateIdle) {
+ List<Session> sessionsToCleanup = new ArrayList<Session>();
+ synchronized (this) {
+ Iterator<Session> iter = sessions.values().iterator();
+ while (iter.hasNext()) {
+ Session session = iter.next();
++ long configuredIdle = maxIdle;
++ if (session instanceof UpdateSession) {
++ configuredIdle = maxUpdateIdle;
++ }
+ long idleTime = System.currentTimeMillis() - session.lastAccessTime;
- if (idleTime > maxIdle && !session.reserved) {
++ if (idleTime > configuredIdle && !session.reserved) {
+ log.info("Closing idle session from user=" + session.getUser() + ", client=" +
session.client + ", idle=" + idleTime + "ms");
+ iter.remove();
+ sessionsToCleanup.add(session);
+ }
+ }
+ }
+
- // do clean up outside of lock
- for (Session session : sessionsToCleanup) {
- session.cleanup();
++ // do clean up outside of lock for TabletServer in a synchronized block for simplicity
vice a synchronized list
++
++ synchronized (idleSessions) {
++
++ sessionsToCleanup.addAll(idleSessions);
++
++ idleSessions.clear();
++
++ // perform cleanup for all of the sessions
++ for (Session session : sessionsToCleanup) {
++ if (!session.cleanup())
++ idleSessions.add(session);
++ }
+ }
+ }
+
+ public synchronized void removeIfNotAccessed(final long sessionId, final long delay) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ final long removeTime = session.lastAccessTime;
+ TimerTask r = new TimerTask() {
+ @Override
+ public void run() {
+ Session sessionToCleanup = null;
+ synchronized (SessionManager.this) {
+ Session session2 = sessions.get(sessionId);
+ if (session2 != null && session2.lastAccessTime == removeTime &&
!session2.reserved) {
+ log.info("Closing not accessed session from user=" + session2.getUser() +
", client=" + session2.client + ", duration=" + delay + "ms");
+ sessions.remove(sessionId);
+ sessionToCleanup = session2;
+ }
+ }
+
+ // call clean up outside of lock
+ if (sessionToCleanup != null)
+ sessionToCleanup.cleanup();
+ }
+ };
+
+ SimpleTimer.getInstance(aconf).schedule(r, delay);
+ }
+ }
+
+ public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable()
{
+ Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
++ Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>();
++
++ synchronized (idleSessions) {
++ /**
++ * Add sessions so that get the list returned in the active scans call
++ */
++ for (Session session : idleSessions) {
++ copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session));
++ }
++ }
++
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+
+ Session session = entry.getValue();
+ @SuppressWarnings("rawtypes")
+ ScanTask nbt = null;
+ String tableID = null;
+
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+ nbt = ss.nextBatchTask;
+ tableID = ss.extent.getTableId().toString();
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+ nbt = mss.lookupTask;
+ tableID = mss.threadPoolExtent.getTableId().toString();
+ }
+
+ if (nbt == null)
+ continue;
+
+ ScanRunState srs = nbt.getScanRunState();
+
+ if (srs == ScanRunState.FINISHED)
+ continue;
+
+ MapCounter<ScanRunState> stateCounts = counts.get(tableID);
+ if (stateCounts == null) {
+ stateCounts = new MapCounter<ScanRunState>();
+ counts.put(tableID, stateCounts);
+ }
+
+ stateCounts.increment(srs, 1);
+ }
+
+ return counts;
+ }
+
+ public synchronized List<ActiveScan> getActiveScans() {
+
- List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
++ final List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
++ final long ct = System.currentTimeMillis();
++ final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>();
+
- long ct = System.currentTimeMillis();
++ synchronized (idleSessions) {
++ /**
++ * Add sessions so that get the list returned in the active scans call
++ */
++ for (Session session : idleSessions) {
++ copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session));
++ }
++ }
+
- for (Entry<Long,Session> entry : sessions.entrySet()) {
++ for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions))
{
+ Session session = entry.getValue();
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<ScanBatch> nbt = ss.nextBatchTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ ActiveScan activeScan = new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(),
ct - ss.startTime, ct - ss.lastAccessTime,
+ ScanType.SINGLE, state, ss.extent.toThrift(), Translator.translate(ss.columnSet,
Translators.CT), ss.ssiList, ss.ssio,
+ ss.auths.getAuthorizationsBB());
+
+ // scanId added by ACCUMULO-2641 is an optional thrift argument and not available
in ActiveScan constructor
+ activeScan.setScanId(entry.getKey());
+ activeScans.add(activeScan);
+
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<MultiScanResult> nbt = mss.lookupTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(),
ct - mss.startTime, ct - mss.lastAccessTime,
+ ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet,
Translators.CT), mss.ssiList, mss.ssio, mss.auths
+ .getAuthorizationsBB()));
+ }
+ }
+
+ return activeScans;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
index 790a352,0000000..c96c75a
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@@ -1,137 -1,0 +1,167 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.ArrayList;
++import java.util.concurrent.Semaphore;
++import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Scanner {
+ private static final Logger log = LoggerFactory.getLogger(Scanner.class);
+
+ private final Tablet tablet;
+ private final ScanOptions options;
+ private Range range;
+ private SortedKeyValueIterator<Key,Value> isolatedIter;
+ private ScanDataSource isolatedDataSource;
+ private boolean sawException = false;
+ private boolean scanClosed = false;
++ /**
++ * A fair semaphore of one is used since explicitly know the access pattern will be one
thread to read and another to call close if the session becomes idle.
++ * Since we're explicitly preventing re-entrance, we're currently using a Sempahore. If
at any point we decide read needs to be re-entrant, we can switch to a
++ * Reentrant lock.
++ */
++ private Semaphore scannerSemaphore;
+
+ Scanner(Tablet tablet, Range range, ScanOptions options) {
+ this.tablet = tablet;
+ this.range = range;
+ this.options = options;
++ this.scannerSemaphore = new Semaphore(1, true);
+ }
+
- public synchronized ScanBatch read() throws IOException, TabletClosedException {
++ public ScanBatch read() throws IOException, TabletClosedException {
+
- if (sawException)
- throw new IllegalStateException("Tried to use scanner after exception occurred.");
-
- if (scanClosed)
- throw new IllegalStateException("Tried to use scanner after it was closed.");
++ ScanDataSource dataSource = null;
+
+ Batch results = null;
+
- ScanDataSource dataSource;
++ try {
+
- if (options.isIsolated()) {
- if (isolatedDataSource == null)
- isolatedDataSource = new ScanDataSource(tablet, options);
- dataSource = isolatedDataSource;
- } else {
- dataSource = new ScanDataSource(tablet, options);
- }
++ try {
++ scannerSemaphore.acquire();
++ } catch (InterruptedException e) {
++ sawException = true;
++ }
+
- try {
++ // sawException may have occurred within close, so we cannot assume that an interrupted
exception was its cause
++ if (sawException)
++ throw new IllegalStateException("Tried to use scanner after exception occurred.");
++
++ if (scanClosed)
++ throw new IllegalStateException("Tried to use scanner after it was closed.");
++
++ if (options.isIsolated()) {
++ if (isolatedDataSource == null)
++ isolatedDataSource = new ScanDataSource(tablet, options);
++ dataSource = isolatedDataSource;
++ } else {
++ dataSource = new ScanDataSource(tablet, options);
++ }
+
+ SortedKeyValueIterator<Key,Value> iter;
+
+ if (options.isIsolated()) {
+ if (isolatedIter == null)
+ isolatedIter = new SourceSwitchingIterator(dataSource, true);
+ else
+ isolatedDataSource.reattachFileManager();
+ iter = isolatedIter;
+ } else {
+ iter = new SourceSwitchingIterator(dataSource, false);
+ }
+
+ results = tablet.nextBatch(iter, range, options.getNum(), options.getColumnSet());
+
+ if (results.getResults() == null) {
+ range = null;
+ return new ScanBatch(new ArrayList<KVEntry>(), false);
+ } else if (results.getContinueKey() == null) {
+ return new ScanBatch(results.getResults(), false);
+ } else {
+ range = new Range(results.getContinueKey(), !results.isSkipContinueKey(), range.getEndKey(),
range.isEndKeyInclusive());
+ return new ScanBatch(results.getResults(), true);
+ }
+
+ } catch (IterationInterruptedException iie) {
+ sawException = true;
+ if (tablet.isClosed())
+ throw new TabletClosedException(iie);
+ else
+ throw iie;
+ } catch (IOException ioe) {
+ if (tablet.shutdownInProgress()) {
+ log.debug("IOException while shutdown in progress ", ioe);
+ throw new TabletClosedException(ioe); // assume IOException was caused by execution
of HDFS shutdown hook
+ }
+
+ sawException = true;
+ dataSource.close(true);
+ throw ioe;
+ } catch (RuntimeException re) {
+ sawException = true;
+ throw re;
+ } finally {
+ // code in finally block because always want
+ // to return mapfiles, even when exception is thrown
- if (!options.isIsolated()) {
++ if (null != dataSource && !options.isIsolated()) {
+ dataSource.close(false);
- } else {
++ } else if (null != dataSource) {
+ dataSource.detachFileManager();
+ }
+
+ if (results != null && results.getResults() != null)
+ tablet.updateQueryStats(results.getResults().size(), results.getNumBytes());
++
++ scannerSemaphore.release();
+ }
+ }
+
+ // close and read are synchronized because can not call close on the data source while
it is in use
+ // this could lead to the case where file iterators that are in use by a thread are returned
+ // to the pool... this would be bad
- public void close() {
++ public boolean close() {
+ options.getInterruptFlag().set(true);
- synchronized (this) {
++
++ boolean obtainedLock = false;
++ try {
++ obtainedLock = scannerSemaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
++ if (!obtainedLock)
++ return false;
++
+ scanClosed = true;
+ if (isolatedDataSource != null)
+ isolatedDataSource.close(false);
++ } catch (InterruptedException e) {
++ return false;
++ } finally {
++ if (obtainedLock)
++ scannerSemaphore.release();
+ }
++ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
index daf781f,6009462..cb5bc18
--- a/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
@@@ -49,9 -49,7 +49,9 @@@ public class ScanSessionTimeOutIT exten
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
{
- cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_SESSION_MAXIDLE.getKey(),
getMaxIdleTimeString()));
+ Map<String,String> siteConfig = cfg.getSiteConfig();
- siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), "3");
++ siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString());
+ cfg.setSiteConfig(siteConfig);
}
@Override
|