accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] 01/01: Merge branch '1.8'
Date Wed, 14 Feb 2018 23:24:12 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit afed8a98d6d7fa69127540165928e18234a584a7
Merge: 1325e81 0bd076e
Author: Keith Turner <kturner@apache.org>
AuthorDate: Wed Feb 14 18:21:46 2018 -0500

    Merge branch '1.8'

 .../accumulo/core/client/impl/ClientContext.java   |  70 +++++--
 .../accumulo/core/client/impl/ThriftScanner.java   |   6 +-
 .../core/client/impl/ThriftTransportKey.java       |   4 +
 .../core/client/impl/ThriftTransportPool.java      | 141 +++++++-------
 .../org/apache/accumulo/core/data/Mutation.java    |   4 +-
 .../apache/accumulo/fate/zookeeper/ZooCache.java   |  77 +++++---
 .../apache/accumulo/fate/zookeeper/ZooLock.java    |   7 +-
 .../accumulo/fate/zookeeper/ZooCacheTest.java      |  12 +-
 .../accumulo/server/master/LiveTServerSet.java     |   4 +-
 .../org/apache/accumulo/server/util/AdminTest.java |   6 +-
 .../monitor/util/AccumuloMonitorAppender.java      |   4 +-
 .../apache/accumulo/tserver/session/Session.java   |   7 +-
 .../accumulo/tserver/session/SessionManager.java   | 215 +++++++++++++--------
 13 files changed, 344 insertions(+), 213 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
index 356fa02,f54a7a9..1fec4a3
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
@@@ -22,7 -22,7 +22,8 @@@ import static java.util.Objects.require
  import java.io.IOException;
  import java.util.Iterator;
  import java.util.Map;
+ import java.util.concurrent.TimeUnit;
 +import java.util.function.Predicate;
  
  import org.apache.accumulo.core.client.AccumuloException;
  import org.apache.accumulo.core.client.AccumuloSecurityException;
@@@ -41,6 -41,11 +42,9 @@@ import org.apache.accumulo.core.securit
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import com.google.common.base.Predicate;
+ import com.google.common.base.Supplier;
+ import com.google.common.base.Suppliers;
+ 
  /**
   * This class represents any essential configuration and credentials needed to initiate
RPC operations throughout the code. It is intended to represent a shared
   * object that contains these things from when the client was first constructed. It is not
public API, and is only an internal representation of the context in
@@@ -58,14 -62,18 +62,23 @@@ public class ClientContext 
    private final AccumuloConfiguration rpcConf;
    protected Connector conn;
  
+   // These fields are very frequently accessed (each time a connection is created) and expensive
to compute, so cache them.
+   private Supplier<Long> timeoutSupplier;
+   private Supplier<SaslConnectionParams> saslSupplier;
+   private Supplier<SslConnectionParams> sslSupplier;
+   private TCredentials rpcCreds;
+ 
+   /**
+    * Instantiate a client context
+    */
    public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf)
{
 +    this(instance, credentials, clientConf, new BatchWriterConfig());
 +  }
 +
 +  public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf,
BatchWriterConfig batchWriterConfig) {
      this(instance, credentials, convertClientConfig(requireNonNull(clientConf, "clientConf
is null")));
      this.clientConf = clientConf;
 +    this.batchWriterConfig = batchWriterConfig;
    }
  
    /**
@@@ -76,6 -84,43 +89,33 @@@
      creds = requireNonNull(credentials, "credentials is null");
      rpcConf = requireNonNull(serverConf, "serverConf is null");
      clientConf = null;
+ 
 -    timeoutSupplier = new Supplier<Long>() {
 -      @Override
 -      public Long get() {
 -        return getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
 -      }
 -    };
++    timeoutSupplier = () -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
+ 
 -    sslSupplier = new Supplier<SslConnectionParams>() {
 -      @Override
 -      public SslConnectionParams get() {
 -        return SslConnectionParams.forClient(getConfiguration());
 -      }
 -    };
++    sslSupplier = () -> SslConnectionParams.forClient(getConfiguration());
+ 
+     saslSupplier = new Supplier<SaslConnectionParams>() {
+       @Override
+       public SaslConnectionParams get() {
+         // Use the clientConf if we have it
+         if (null != clientConf) {
+           if (!clientConf.hasSasl()) {
+             return null;
+           }
+           return new SaslConnectionParams(clientConf, getCredentials().getToken());
+         }
+         AccumuloConfiguration conf = getConfiguration();
+         if (!conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+           return null;
+         }
+         return new SaslConnectionParams(conf, getCredentials().getToken());
+       }
+     };
 -
++    
+     timeoutSupplier = Suppliers.memoizeWithExpiration(timeoutSupplier, 100, TimeUnit.MILLISECONDS);
+     sslSupplier = Suppliers.memoizeWithExpiration(sslSupplier, 100, TimeUnit.MILLISECONDS);
+     saslSupplier = Suppliers.memoizeWithExpiration(saslSupplier, 100, TimeUnit.MILLISECONDS);
+ 
    }
  
    /**
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index 3472476,b7b1c67..d6637e4
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@@ -414,11 -414,12 +414,11 @@@ public class ThriftScanner 
        scanState.prevLoc = loc;
  
        if (scanState.scanID == null) {
-         String msg = "Starting scan tserver=" + loc.tablet_location + " tablet=" + loc.tablet_extent
+ " range=" + scanState.range + " ssil="
-             + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions
+ " context=" + scanState.classLoaderContext;
-         Thread.currentThread().setName(msg);
 -
+         Thread.currentThread().setName("Starting scan tserver=" + loc.tablet_location +
" tableId=" + loc.tablet_extent.getTableId());
  
          if (log.isTraceEnabled()) {
+           String msg = "Starting scan tserver=" + loc.tablet_location + " tablet=" + loc.tablet_extent
+ " range=" + scanState.range + " ssil="
 -              + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions;
++              + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions
+ " context=" + scanState.classLoaderContext;
            log.trace("tid={} {}", Thread.currentThread().getId(), msg);
            timer = new OpTimer().start();
          }
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 9d07af2,c221607..93c3432
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@@ -40,8 -41,11 +40,10 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.Iterables;
  
  public class ThriftTransportPool {
 -  private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission");
  
    private static final Random random = new Random();
    private long killTime = 1000 * 3;
@@@ -426,13 -449,11 +447,11 @@@
            Collections.shuffle(cachedServers, random);
  
            for (ThriftTransportKey ttk : cachedServers) {
-             for (CachedConnection cachedConnection : getCache().get(ttk)) {
-               if (!cachedConnection.isReserved()) {
-                 cachedConnection.setReserved(true);
-                 final String serverAddr = ttk.getServer().toString();
-                 log.trace("Using existing connection to {}", serverAddr);
-                 return new Pair<>(serverAddr, cachedConnection.transport);
-               }
+             CachedConnection cachedConnection = getCache().get(ttk).reserveAny();
+             if (cachedConnection != null) {
+               final String serverAddr = ttk.getServer().toString();
+               log.trace("Using existing connection to {}", serverAddr);
 -              return new Pair<String,TTransport>(serverAddr, cachedConnection.transport);
++              return new Pair<>(serverAddr, cachedConnection.transport);
              }
            }
          }
@@@ -446,15 -467,12 +465,12 @@@
  
        if (preferCachedConnection) {
          synchronized (this) {
-           List<CachedConnection> cachedConnList = getCache().get(ttk);
-           if (cachedConnList != null) {
-             for (CachedConnection cachedConnection : cachedConnList) {
-               if (!cachedConnection.isReserved()) {
-                 cachedConnection.setReserved(true);
-                 final String serverAddr = ttk.getServer().toString();
-                 log.trace("Using existing connection to {} timeout {}", serverAddr, ttk.getTimeout());
-                 return new Pair<>(serverAddr, cachedConnection.transport);
-               }
+           CachedConnections cachedConns = getCache().get(ttk);
+           if (cachedConns != null) {
+             CachedConnection cachedConnection = cachedConns.reserveAny();
+             if (cachedConnection != null) {
+               final String serverAddr = ttk.getServer().toString();
 -              return new Pair<String,TTransport>(serverAddr, cachedConnection.transport);
++              return new Pair<>(serverAddr, cachedConnection.transport);
              }
            }
          }
diff --cc core/src/main/java/org/apache/accumulo/core/data/Mutation.java
index 5e1a7ba,ebc72f5..6ebdcb0
--- a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
@@@ -321,9 -306,6 +321,9 @@@ public class Mutation implements Writab
      if (buffer == null) {
        throw new IllegalStateException("Can not add to mutation after serializing it");
      }
-     long estimatedSizeAfterPut = estRowAndLargeValSize + buffer.size() + cfLength + cqLength
+ cv.length + (hasts ? 8 : 0) + valLength + 2
-         + 4 * SERIALIZATION_OVERHEAD;
++    long estimatedSizeAfterPut = estRowAndLargeValSize + buffer.size() + cfLength + cqLength
+ cv.length + (hasts ? 8 : 0) + valLength + 2 + 4
++        * SERIALIZATION_OVERHEAD;
 +    Preconditions.checkArgument(estimatedSizeAfterPut < MAX_MUTATION_SIZE &&
estimatedSizeAfterPut >= 0, "Maximum mutation size must be less than 2GB ");
      put(cf, cfLength);
      put(cq, cqLength);
      put(cv);
diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
index 1d8a64e,2ef938b..afe7d37
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
@@@ -65,9 -60,34 +60,39 @@@ public class ZooCache 
  
    private final ZooReader zReader;
  
+   public static class ZcStat {
+     private long ephemeralOwner;
++    private long mzxid;
+ 
 -    public ZcStat() {
 -
 -    }
++    public ZcStat() {}
+ 
+     private ZcStat(Stat stat) {
+       this.ephemeralOwner = stat.getEphemeralOwner();
++      this.mzxid = stat.getMzxid();
+     }
+ 
+     public long getEphemeralOwner() {
+       return ephemeralOwner;
+     }
+ 
+     private void set(ZcStat cachedStat) {
+       this.ephemeralOwner = cachedStat.ephemeralOwner;
++      this.mzxid = cachedStat.mzxid;
+     }
+ 
+     @VisibleForTesting
+     public void setEphemeralOwner(long ephemeralOwner) {
+       this.ephemeralOwner = ephemeralOwner;
+     }
++
++    public long getMzxid() {
++      return mzxid;
++    }
+   }
+ 
    private static class ImmutableCacheCopies {
      final Map<String,byte[]> cache;
-     final Map<String,Stat> statCache;
+     final Map<String,ZcStat> statCache;
      final Map<String,List<String>> childrenCache;
      final long updateCount;
  
@@@ -369,11 -390,11 +395,11 @@@
                throw new ConcurrentModificationException();
              }
              if (log.isTraceEnabled()) {
 -              log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new
String(data, UTF_8)));
 +              log.trace("zookeeper contained {} {}", zPath, (data == null ? null : new String(data,
UTF_8)));
              }
            }
-           put(zPath, data, stat);
-           copyStats(status, stat);
+           put(zPath, data, zstat);
+           copyStats(status, zstat);
            return data;
          } finally {
            cacheWriteLock.unlock();
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
index 5a24e4f,0000000..07c886b
mode 100644,000000..100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
@@@ -1,221 -1,0 +1,221 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.monitor.util;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.util.Objects;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.function.Function;
 +import java.util.function.Supplier;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.HostAndPort;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooCache;
++import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat;
 +import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.log4j.AppenderSkeleton;
 +import org.apache.log4j.AsyncAppender;
 +import org.apache.log4j.net.SocketAppender;
- import org.apache.zookeeper.data.Stat;
 +
 +public class AccumuloMonitorAppender extends AsyncAppender implements AutoCloseable {
 +
 +  final ScheduledExecutorService executorService;
 +  final AtomicBoolean trackerScheduled;
 +  private int frequency = 0;
 +  private MonitorTracker tracker = null;
 +
 +  /**
 +   * A Log4j Appender which follows the registered location of the active Accumulo monitor
service, and forwards log messages to it
 +   */
 +  public AccumuloMonitorAppender() {
 +    // create the background thread to watch for updates to monitor location
 +    trackerScheduled = new AtomicBoolean(false);
 +    executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
 +      Thread t = new Thread(runnable, AccumuloMonitorAppender.class.getSimpleName() + "
Location Tracker");
 +      t.setDaemon(true);
 +      return t;
 +    });
 +  }
 +
 +  public void setFrequency(int millis) {
 +    if (millis > 0) {
 +      frequency = millis;
 +    }
 +  }
 +
 +  public int getFrequency() {
 +    return frequency;
 +  }
 +
 +  // this is just for testing
 +  void setTracker(MonitorTracker monitorTracker) {
 +    tracker = monitorTracker;
 +  }
 +
 +  @Override
 +  public void activateOptions() {
 +    // only schedule it once (in case options get activated more than once); not sure if
this is possible
 +    if (trackerScheduled.compareAndSet(false, true)) {
 +      if (frequency <= 0) {
 +        // use default rate of 5 seconds between each check
 +        frequency = 5000;
 +      }
 +      if (tracker == null) {
 +        tracker = new MonitorTracker(this, new ZooCacheLocationSupplier(), new SocketAppenderFactory());
 +      }
 +      executorService.scheduleWithFixedDelay(tracker, frequency, frequency, TimeUnit.MILLISECONDS);
 +    }
 +    super.activateOptions();
 +  }
 +
 +  @Override
 +  public void close() {
 +    if (!executorService.isShutdown()) {
 +      executorService.shutdownNow();
 +    }
 +    super.close();
 +  }
 +
 +  static class MonitorLocation {
 +    private final String location;
 +    private final long modId;
 +
 +    public MonitorLocation(long modId, byte[] location) {
 +      this.modId = modId;
 +      this.location = location == null ? null : new String(location, UTF_8);
 +    }
 +
 +    public boolean hasLocation() {
 +      return location != null;
 +    }
 +
 +    public String getLocation() {
 +      return location;
 +    }
 +
 +    @Override
 +    public boolean equals(Object obj) {
 +      if (obj != null && obj instanceof MonitorLocation) {
 +        MonitorLocation other = (MonitorLocation) obj;
 +        return modId == other.modId && Objects.equals(location, other.location);
 +      }
 +      return false;
 +    }
 +
 +    @Override
 +    public int hashCode() {
 +      return Long.hashCode(modId);
 +    }
 +  }
 +
 +  private static class ZooCacheLocationSupplier implements Supplier<MonitorLocation>
{
 +
 +    // path and zooCache are lazily set the first time this tracker is run
 +    // this allows the tracker to be constructed and scheduled during log4j initialization
without
 +    // triggering any actual logs from the Accumulo or ZooKeeper code
 +    private String path = null;
 +    private ZooCache zooCache = null;
 +
 +    @Override
 +    public MonitorLocation get() {
 +      // lazily set up path and zooCache (see comment in constructor)
 +      if (this.zooCache == null) {
 +        Instance instance = HdfsZooInstance.getInstance();
 +        this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR;
 +        this.zooCache = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
 +      }
 +
 +      // get the current location from the cache and update if necessary
-       Stat stat = new Stat();
++      ZcStat stat = new ZcStat();
 +      byte[] loc = zooCache.get(path, stat);
 +      // mzxid is 0 if location does not exist and the non-zero transaction id of the last
modification otherwise
 +      return new MonitorLocation(stat.getMzxid(), loc);
 +    }
 +  }
 +
 +  private static class SocketAppenderFactory implements Function<MonitorLocation,AppenderSkeleton>
{
 +    @Override
 +    public AppenderSkeleton apply(MonitorLocation loc) {
 +      int defaultPort = Integer.parseUnsignedInt(Property.MONITOR_LOG4J_PORT.getDefaultValue());
 +      HostAndPort remote = HostAndPort.fromString(loc.getLocation());
 +
 +      SocketAppender socketAppender = new SocketAppender();
 +      socketAppender.setApplication(System.getProperty("accumulo.application", "unknown"));
 +      socketAppender.setRemoteHost(remote.getHost());
 +      socketAppender.setPort(remote.getPortOrDefault(defaultPort));
 +
 +      return socketAppender;
 +    }
 +  }
 +
 +  static class MonitorTracker implements Runnable {
 +
 +    private final AccumuloMonitorAppender parentAsyncAppender;
 +    private final Supplier<MonitorLocation> currentLocationSupplier;
 +    private final Function<MonitorLocation,AppenderSkeleton> appenderFactory;
 +
 +    private MonitorLocation lastLocation;
 +    private AppenderSkeleton lastSocketAppender;
 +
 +    public MonitorTracker(AccumuloMonitorAppender appender, Supplier<MonitorLocation>
currentLocationSupplier,
 +        Function<MonitorLocation,AppenderSkeleton> appenderFactory) {
 +      this.parentAsyncAppender = Objects.requireNonNull(appender);
 +      this.appenderFactory = Objects.requireNonNull(appenderFactory);
 +      this.currentLocationSupplier = Objects.requireNonNull(currentLocationSupplier);
 +
 +      this.lastLocation = new MonitorLocation(0, null);
 +      this.lastSocketAppender = null;
 +    }
 +
 +    @Override
 +    public void run() {
 +      try {
 +        MonitorLocation currentLocation = currentLocationSupplier.get();
 +        // detect change
 +        if (!currentLocation.equals(lastLocation)) {
 +          // clean up old appender
 +          if (lastSocketAppender != null) {
 +            parentAsyncAppender.removeAppender(lastSocketAppender);
 +            lastSocketAppender.close();
 +            lastSocketAppender = null;
 +          }
 +          // create a new one
 +          if (currentLocation.hasLocation()) {
 +            lastSocketAppender = appenderFactory.apply(currentLocation);
 +            lastSocketAppender.activateOptions();
 +            parentAsyncAppender.addAppender(lastSocketAppender);
 +          }
 +          // update the last location only if switching was successful
 +          lastLocation = currentLocation;
 +        }
 +      } catch (Exception e) {
 +        // dump any non-fatal problems to the console, but let it run again
 +        e.printStackTrace();
 +      }
 +    }
 +
 +  }
 +
 +}
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 5f0a962,e0fb795..06bd6a5
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@@ -26,8 -27,9 +27,10 @@@ import java.util.Map
  import java.util.Map.Entry;
  import java.util.Set;
  import java.util.TimerTask;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
  
 +import org.apache.accumulo.core.client.impl.Table;
  import org.apache.accumulo.core.client.impl.Translator;
  import org.apache.accumulo.core.client.impl.Translators;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@@ -171,19 -218,23 +219,22 @@@ public class SessionManager 
  
    private void sweep(final long maxIdle, final long maxUpdateIdle) {
      List<Session> sessionsToCleanup = new ArrayList<>();
-     synchronized (this) {
-       Iterator<Session> iter = sessions.values().iterator();
-       while (iter.hasNext()) {
-         Session session = iter.next();
-         long configuredIdle = maxIdle;
-         if (session instanceof UpdateSession) {
-           configuredIdle = maxUpdateIdle;
-         }
-         long idleTime = System.currentTimeMillis() - session.lastAccessTime;
-         if (idleTime > configuredIdle && !session.reserved) {
-           log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(),
session.client, idleTime);
-           iter.remove();
-           sessionsToCleanup.add(session);
 -
+     Iterator<Session> iter = sessions.values().iterator();
+     while (iter.hasNext()) {
+       Session session = iter.next();
+       synchronized (session) {
+         if (session.state == State.UNRESERVED) {
+           long configuredIdle = maxIdle;
+           if (session instanceof UpdateSession) {
+             configuredIdle = maxUpdateIdle;
+           }
+           long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+           if (idleTime > configuredIdle) {
 -            log.info("Closing idle session from user=" + session.getUser() + ", client="
+ session.client + ", idle=" + idleTime + "ms");
++            log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(),
session.client, idleTime);
+             iter.remove();
+             sessionsToCleanup.add(session);
+             session.state = State.REMOVED;
+           }
          }
        }
      }
@@@ -231,8 -289,8 +289,9 @@@
      }
    }
  
-   public synchronized Map<Table.ID,MapCounter<ScanRunState>> getActiveScansPerTable()
{
 -  public Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
 -    Map<String,MapCounter<ScanRunState>> counts = new HashMap<>();
++  public Map<Table.ID,MapCounter<ScanRunState>> getActiveScansPerTable() {
 +    Map<Table.ID,MapCounter<ScanRunState>> counts = new HashMap<>();
++
      Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
  
      synchronized (idleSessions) {

-- 
To stop receiving notification emails like this one, please contact
kturner@apache.org.

Mime
View raw message