accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [3/3] accumulo git commit: ACCUMULO-3549 merge to master
Date Mon, 02 Feb 2015 22:53:21 GMT
ACCUMULO-3549 merge to master


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a6a2be8a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a6a2be8a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a6a2be8a

Branch: refs/heads/master
Commit: a6a2be8aa9c607d5ebe7f902448e6245a130b716
Parents: c3493c1 c5d2bd5
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Mon Feb 2 17:52:50 2015 -0500
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Mon Feb 2 17:52:50 2015 -0500

----------------------------------------------------------------------
 .../accumulo/core/client/impl/TabletLocator.java     |  4 ++++
 .../org/apache/accumulo/tserver/TabletServer.java    | 15 +++++++++++++++
 2 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a6a2be8a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
index 782a599,5f30ddc..2391fe6
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
@@@ -92,8 -92,12 +92,12 @@@ public abstract class TabletLocator 
  
    private static HashMap<LocatorKey,TabletLocator> locators = new HashMap<LocatorKey,TabletLocator>();
  
+   public static synchronized void clearLocators() {
+     locators.clear();
+   }
+ 
 -  public static synchronized TabletLocator getLocator(Instance instance, Text tableId) {
 -
 +  public static synchronized TabletLocator getLocator(ClientContext context, Text tableId)
{
 +    Instance instance = context.getInstance();
      LocatorKey key = new LocatorKey(instance.getInstanceID(), tableId);
      TabletLocator tl = locators.get(key);
      if (tl == null) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a6a2be8a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 7d49e65,dbececd..a5675dc
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -59,9 -68,9 +60,10 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.Instance;
  import org.apache.accumulo.core.client.impl.CompressedIterators;
  import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
 +import org.apache.accumulo.core.client.impl.DurabilityImpl;
  import org.apache.accumulo.core.client.impl.ScannerImpl;
  import org.apache.accumulo.core.client.impl.Tables;
+ import org.apache.accumulo.core.client.impl.TabletLocator;
  import org.apache.accumulo.core.client.impl.TabletType;
  import org.apache.accumulo.core.client.impl.Translator;
  import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
@@@ -248,157 -254,966 +250,170 @@@ public class TabletServer extends Accum
    private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
    private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
    private static final long TIME_BETWEEN_GC_CHECKS = 5000;
+   private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000;
 +  private static final Set<Column> EMPTY_COLUMNS = Collections.emptySet();
  
 -  private TabletServerLogger logger;
 -
 -  protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
 -
 -  private ServerConfiguration serverConfig;
 -  private LogSorter logSorter = null;
 -
 -  public TabletServer(ServerConfiguration conf, VolumeManager fs) {
 -    super();
 -    this.serverConfig = conf;
 -    this.instance = conf.getInstance();
 -    this.fs = fs;
 -    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
 -    SimpleTimer.getInstance().schedule(new Runnable() {
 -      @Override
 -      public void run() {
 -        synchronized (onlineTablets) {
 -          long now = System.currentTimeMillis();
 -          for (Tablet tablet : onlineTablets.values())
 -            try {
 -              tablet.updateRates(now);
 -            } catch (Exception ex) {
 -              log.error(ex, ex);
 -            }
 -        }
 -      }
 -    }, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS);
 -    SimpleTimer.getInstance().schedule(new Runnable() {
 -      @Override
 -      public void run() {
 -        TabletLocator.clearLocators();
 -      }
 -    }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
 -  }
 -
 -  private static long jitter(long ms) {
 -    Random r = new Random();
 -    // add a random 10% wait
 -    return (long)((1. + (r.nextDouble() / 10)) * ms);
 -  }
 -
 -  private synchronized static void logGCInfo(AccumuloConfiguration conf) {
 -    long now = System.currentTimeMillis();
 -
 -    List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
 -    Runtime rt = Runtime.getRuntime();
 -
 -    StringBuilder sb = new StringBuilder("gc");
 -
 -    boolean sawChange = false;
 -
 -    long maxIncreaseInCollectionTime = 0;
 -
 -    for (GarbageCollectorMXBean gcBean : gcmBeans) {
 -      Long prevTime = prevGcTime.get(gcBean.getName());
 -      long pt = 0;
 -      if (prevTime != null) {
 -        pt = prevTime;
 -      }
 -
 -      long time = gcBean.getCollectionTime();
 -
 -      if (time - pt != 0) {
 -        sawChange = true;
 -      }
 -
 -      long increaseInCollectionTime = time - pt;
 -      sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0,
increaseInCollectionTime / 1000.0));
 -      maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
 -      prevGcTime.put(gcBean.getName(), time);
 -    }
 -
 -    long mem = rt.freeMemory();
 -    if (maxIncreaseInCollectionTime == 0) {
 -      gcTimeIncreasedCount = 0;
 -    } else {
 -      gcTimeIncreasedCount++;
 -      if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
 -        log.warn("Running low on memory");
 -        gcTimeIncreasedCount = 0;
 -      }
 -    }
 -
 -    if (mem > lastMemorySize) {
 -      sawChange = true;
 -    }
 -
 -    String sign = "+";
 -    if (mem - lastMemorySize <= 0) {
 -      sign = "";
 -    }
 -
 -    sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize),
rt.totalMemory()));
 -
 -    if (sawChange) {
 -      log.debug(sb.toString());
 -    }
 -
 -    final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
 -    if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
 -      long diff = now - lastMemoryCheckTime;
 -      if (diff > keepAliveTimeout) {
 -        log.warn(String.format("GC pause checker not called in a timely fashion. Expected
every %.1f seconds but was %.1f seconds since last check",
 -            TIME_BETWEEN_GC_CHECKS / 1000., diff / 1000.));
 -      }
 -      lastMemoryCheckTime = now;
 -      return;
 -    }
 -
 -    if (maxIncreaseInCollectionTime > keepAliveTimeout) {
 -      Halt.halt("Garbage collection may be interfering with lock keep-alive.  Halting.",
-1);
 -    }
 -
 -    lastMemorySize = mem;
 -    lastMemoryCheckTime = now;
 -  }
 -
 -  private TabletStatsKeeper statsKeeper;
 -
 -  private static class Session {
 -    long lastAccessTime;
 -    long startTime;
 -    String user;
 -    String client = TServerUtils.clientAddress.get();
 -    public boolean reserved;
 -
 -    public void cleanup() {}
 -  }
 -
 -  private static class SessionManager {
 -
 -    SecureRandom random;
 -    Map<Long,Session> sessions;
 -    long maxIdle;
 -
 -    SessionManager(AccumuloConfiguration conf) {
 -      random = new SecureRandom();
 -      sessions = new HashMap<Long,Session>();
 -
 -      maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 -
 -      Runnable r = new Runnable() {
 -        @Override
 -        public void run() {
 -          sweep(maxIdle);
 -        }
 -      };
 -
 -      SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
 -    }
 -
 -    synchronized long createSession(Session session, boolean reserve) {
 -      long sid = random.nextLong();
 -
 -      while (sessions.containsKey(sid)) {
 -        sid = random.nextLong();
 -      }
 -
 -      sessions.put(sid, session);
 -
 -      session.reserved = reserve;
 -
 -      session.startTime = session.lastAccessTime = System.currentTimeMillis();
 -
 -      return sid;
 -    }
 -
 -    long getMaxIdleTime() {
 -      return maxIdle;
 -    }
 -
 -    /**
 -     * while a session is reserved, it cannot be canceled or removed
 -     */
 -    synchronized Session reserveSession(long sessionId) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        if (session.reserved)
 -          throw new IllegalStateException();
 -        session.reserved = true;
 -      }
 -
 -      return session;
 -
 -    }
 -
 -    synchronized Session reserveSession(long sessionId, boolean wait) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        while (wait && session.reserved) {
 -          try {
 -            wait(1000);
 -          } catch (InterruptedException e) {
 -            throw new RuntimeException();
 -          }
 -        }
 -
 -        if (session.reserved)
 -          throw new IllegalStateException();
 -        session.reserved = true;
 -      }
 -
 -      return session;
 -
 -    }
 -
 -    synchronized void unreserveSession(Session session) {
 -      if (!session.reserved)
 -        throw new IllegalStateException();
 -      notifyAll();
 -      session.reserved = false;
 -      session.lastAccessTime = System.currentTimeMillis();
 -    }
 -
 -    synchronized void unreserveSession(long sessionId) {
 -      Session session = getSession(sessionId);
 -      if (session != null)
 -        unreserveSession(session);
 -    }
 -
 -    synchronized Session getSession(long sessionId) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null)
 -        session.lastAccessTime = System.currentTimeMillis();
 -      return session;
 -    }
 -
 -    Session removeSession(long sessionId) {
 -      return removeSession(sessionId, false);
 -    }
 -
 -    Session removeSession(long sessionId, boolean unreserve) {
 -      Session session = null;
 -      synchronized (this) {
 -        session = sessions.remove(sessionId);
 -        if (unreserve && session != null)
 -          unreserveSession(session);
 -      }
 -
 -      // do clean up out side of lock..
 -      if (session != null)
 -        session.cleanup();
 -
 -      return session;
 -    }
 -
 -    private void sweep(long maxIdle) {
 -      ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
 -      synchronized (this) {
 -        Iterator<Session> iter = sessions.values().iterator();
 -        while (iter.hasNext()) {
 -          Session session = iter.next();
 -          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
 -          if (idleTime > maxIdle && !session.reserved) {
 -            log.info("Closing idle session from user=" + session.user + ", client=" + session.client
+ ", idle=" + idleTime + "ms");
 -            iter.remove();
 -            sessionsToCleanup.add(session);
 -          }
 -        }
 -      }
 -
 -      // do clean up outside of lock
 -      for (Session session : sessionsToCleanup) {
 -        session.cleanup();
 -      }
 -    }
 -
 -    synchronized void removeIfNotAccessed(final long sessionId, final long delay) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        final long removeTime = session.lastAccessTime;
 -        TimerTask r = new TimerTask() {
 -          @Override
 -          public void run() {
 -            Session sessionToCleanup = null;
 -            synchronized (SessionManager.this) {
 -              Session session2 = sessions.get(sessionId);
 -              if (session2 != null && session2.lastAccessTime == removeTime &&
!session2.reserved) {
 -                log.info("Closing not accessed session from user=" + session2.user + ",
client=" + session2.client + ", duration=" + delay + "ms");
 -                sessions.remove(sessionId);
 -                sessionToCleanup = session2;
 -              }
 -            }
 -
 -            // call clean up outside of lock
 -            if (sessionToCleanup != null)
 -              sessionToCleanup.cleanup();
 -          }
 -        };
 -
 -        SimpleTimer.getInstance().schedule(r, delay);
 -      }
 -    }
 -
 -    public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable()
{
 -      Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
 -      for (Entry<Long,Session> entry : sessions.entrySet()) {
 -
 -        Session session = entry.getValue();
 -        @SuppressWarnings("rawtypes")
 -        ScanTask nbt = null;
 -        String tableID = null;
 -
 -        if (session instanceof ScanSession) {
 -          ScanSession ss = (ScanSession) session;
 -          nbt = ss.nextBatchTask;
 -          tableID = ss.extent.getTableId().toString();
 -        } else if (session instanceof MultiScanSession) {
 -          MultiScanSession mss = (MultiScanSession) session;
 -          nbt = mss.lookupTask;
 -          tableID = mss.threadPoolExtent.getTableId().toString();
 -        }
 -
 -        if (nbt == null)
 -          continue;
 -
 -        ScanRunState srs = nbt.getScanRunState();
 -
 -        if (srs == ScanRunState.FINISHED)
 -          continue;
 -
 -        MapCounter<ScanRunState> stateCounts = counts.get(tableID);
 -        if (stateCounts == null) {
 -          stateCounts = new MapCounter<ScanRunState>();
 -          counts.put(tableID, stateCounts);
 -        }
 -
 -        stateCounts.increment(srs, 1);
 -      }
 -
 -      return counts;
 -    }
 -
 -    public synchronized List<ActiveScan> getActiveScans() {
 -
 -      ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
 -
 -      long ct = System.currentTimeMillis();
 -
 -      for (Entry<Long,Session> entry : sessions.entrySet()) {
 -        Session session = entry.getValue();
 -        if (session instanceof ScanSession) {
 -          ScanSession ss = (ScanSession) session;
 -
 -          ScanState state = ScanState.RUNNING;
 -
 -          ScanTask<ScanBatch> nbt = ss.nextBatchTask;
 -          if (nbt == null) {
 -            state = ScanState.IDLE;
 -          } else {
 -            switch (nbt.getScanRunState()) {
 -              case QUEUED:
 -                state = ScanState.QUEUED;
 -                break;
 -              case FINISHED:
 -                state = ScanState.IDLE;
 -                break;
 -              case RUNNING:
 -              default:
 -                /* do nothing */
 -                break;
 -            }
 -          }
 -
 -          ActiveScan activeScan = new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(),
ct - ss.startTime, ct - ss.lastAccessTime,
 -              ScanType.SINGLE, state, ss.extent.toThrift(), Translator.translate(ss.columnSet,
Translators.CT), ss.ssiList, ss.ssio,
 -              ss.auths.getAuthorizationsBB());
 -
 -          // scanId added by ACCUMULO-2641 is an optional thrift argument and not available
in ActiveScan constructor
 -          activeScan.setScanId(entry.getKey());
 -          activeScans.add(activeScan);
 -
 -        } else if (session instanceof MultiScanSession) {
 -          MultiScanSession mss = (MultiScanSession) session;
 -
 -          ScanState state = ScanState.RUNNING;
 -
 -          ScanTask<MultiScanResult> nbt = mss.lookupTask;
 -          if (nbt == null) {
 -            state = ScanState.IDLE;
 -          } else {
 -            switch (nbt.getScanRunState()) {
 -              case QUEUED:
 -                state = ScanState.QUEUED;
 -                break;
 -              case FINISHED:
 -                state = ScanState.IDLE;
 -                break;
 -              case RUNNING:
 -              default:
 -                /* do nothing */
 -                break;
 -            }
 -          }
 -
 -          activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(),
ct - mss.startTime, ct - mss.lastAccessTime,
 -              ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet,
Translators.CT), mss.ssiList, mss.ssio, mss.auths
 -                  .getAuthorizationsBB()));
 -        }
 -      }
 -
 -      return activeScans;
 -    }
 -  }
 -
 -  static class TservConstraintEnv implements Environment {
 -
 -    private TCredentials credentials;
 -    private SecurityOperation security;
 -    private Authorizations auths;
 -    private KeyExtent ke;
 -
 -    TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
 -      this.security = secOp;
 -      this.credentials = credentials;
 -    }
 -
 -    void setExtent(KeyExtent ke) {
 -      this.ke = ke;
 -    }
 -
 -    @Override
 -    public KeyExtent getExtent() {
 -      return ke;
 -    }
 -
 -    @Override
 -    public String getUser() {
 -      return credentials.getPrincipal();
 -    }
 -
 -    @Override
 -    @Deprecated
 -    public Authorizations getAuthorizations() {
 -      if (auths == null)
 -        try {
 -          this.auths = security.getUserAuthorizations(credentials);
 -        } catch (ThriftSecurityException e) {
 -          throw new RuntimeException(e);
 -        }
 -      return auths;
 -    }
 -
 -    @Override
 -    public AuthorizationContainer getAuthorizationsContainer() {
 -      return new AuthorizationContainer() {
 -        @Override
 -        public boolean contains(ByteSequence auth) {
 -          try {
 -            return security.userHasAuthorizations(credentials,
 -                Collections.<ByteBuffer> singletonList(ByteBuffer.wrap(auth.getBackingArray(),
auth.offset(), auth.length())));
 -          } catch (ThriftSecurityException e) {
 -            throw new RuntimeException(e);
 -          }
 -        }
 -      };
 -    }
 -  }
 -
 -  private abstract class ScanTask<T> implements RunnableFuture<T> {
 -
 -    protected AtomicBoolean interruptFlag;
 -    protected ArrayBlockingQueue<Object> resultQueue;
 -    protected AtomicInteger state;
 -    protected AtomicReference<ScanRunState> runState;
 -
 -    private static final int INITIAL = 1;
 -    private static final int ADDED = 2;
 -    private static final int CANCELED = 3;
 +  private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
 +  private final TransactionWatcher watcher = new TransactionWatcher();
 +  private final ZooCache masterLockCache = new ZooCache();
  
 -    ScanTask() {
 -      interruptFlag = new AtomicBoolean(false);
 -      runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
 -      state = new AtomicInteger(INITIAL);
 -      resultQueue = new ArrayBlockingQueue<Object>(1);
 -    }
 -
 -    protected void addResult(Object o) {
 -      if (state.compareAndSet(INITIAL, ADDED))
 -        resultQueue.add(o);
 -      else if (state.get() == ADDED)
 -        throw new IllegalStateException("Tried to add more than one result");
 -    }
 -
 -    @Override
 -    public boolean cancel(boolean mayInterruptIfRunning) {
 -      if (!mayInterruptIfRunning)
 -        throw new IllegalArgumentException("Cancel will always attempt to interupt running
next batch task");
 -
 -      if (state.get() == CANCELED)
 -        return true;
 -
 -      if (state.compareAndSet(INITIAL, CANCELED)) {
 -        interruptFlag.set(true);
 -        resultQueue = null;
 -        return true;
 -      }
 -
 -      return false;
 -    }
 -
 -    @Override
 -    public T get() throws InterruptedException, ExecutionException {
 -      throw new UnsupportedOperationException();
 -    }
 -
 -    @SuppressWarnings("unchecked")
 -    @Override
 -    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
 -
 -      ArrayBlockingQueue<Object> localRQ = resultQueue;
 -
 -      if (state.get() == CANCELED)
 -        throw new CancellationException();
 -
 -      if (localRQ == null && state.get() == ADDED)
 -        throw new IllegalStateException("Tried to get result twice");
 -
 -      Object r = localRQ.poll(timeout, unit);
 -
 -      // could have been canceled while waiting
 -      if (state.get() == CANCELED) {
 -        if (r != null)
 -          throw new IllegalStateException("Nothing should have been added when in canceled
state");
 -
 -        throw new CancellationException();
 -      }
 -
 -      if (r == null)
 -        throw new TimeoutException();
 -
 -      // make this method stop working now that something is being
 -      // returned
 -      resultQueue = null;
 -
 -      if (r instanceof Throwable)
 -        throw new ExecutionException((Throwable) r);
 -
 -      return (T) r;
 -    }
 -
 -    @Override
 -    public boolean isCancelled() {
 -      return state.get() == CANCELED;
 -    }
 -
 -    @Override
 -    public boolean isDone() {
 -      return runState.get().equals(ScanRunState.FINISHED);
 -    }
 -
 -    public ScanRunState getScanRunState() {
 -      return runState.get();
 -    }
 -
 -  }
 -
 -  private static class ConditionalSession extends Session {
 -    public TCredentials credentials;
 -    public Authorizations auths;
 -    public String tableId;
 -    public AtomicBoolean interruptFlag;
 -
 -    @Override
 -    public void cleanup() {
 -      interruptFlag.set(true);
 -    }
 -  }
 -
 -  private static class UpdateSession extends Session {
 -    public Tablet currentTablet;
 -    public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
 -    Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
 -    HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
 -    public Violations violations;
 -    public TCredentials credentials;
 -    public long totalUpdates = 0;
 -    public long flushTime = 0;
 -    Stat prepareTimes = new Stat();
 -    Stat walogTimes = new Stat();
 -    Stat commitTimes = new Stat();
 -    Stat authTimes = new Stat();
 -    public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
 -    public long queuedMutationSize = 0;
 -    TservConstraintEnv cenv = null;
 -  }
 -
 -  private static class ScanSession extends Session {
 -    public KeyExtent extent;
 -    public HashSet<Column> columnSet;
 -    public List<IterInfo> ssiList;
 -    public Map<String,Map<String,String>> ssio;
 -    public Authorizations auths;
 -    public long entriesReturned = 0;
 -    public Stat nbTimes = new Stat();
 -    public long batchCount = 0;
 -    public volatile ScanTask<ScanBatch> nextBatchTask;
 -    public AtomicBoolean interruptFlag;
 -    public Scanner scanner;
 -    public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
 -
 -    @Override
 -    public void cleanup() {
 -      try {
 -        if (nextBatchTask != null)
 -          nextBatchTask.cancel(true);
 -      } finally {
 -        if (scanner != null)
 -          scanner.close();
 -      }
 -    }
 -
 -  }
 -
 -  private static class MultiScanSession extends Session {
 -    HashSet<Column> columnSet;
 -    Map<KeyExtent,List<Range>> queries;
 -    public List<IterInfo> ssiList;
 -    public Map<String,Map<String,String>> ssio;
 -    public Authorizations auths;
 -
 -    // stats
 -    int numRanges;
 -    int numTablets;
 -    int numEntries;
 -    long totalLookupTime;
 -
 -    public volatile ScanTask<MultiScanResult> lookupTask;
 -    public KeyExtent threadPoolExtent;
 -
 -    @Override
 -    public void cleanup() {
 -      if (lookupTask != null)
 -        lookupTask.cancel(true);
 -    }
 -  }
 -
 -  /**
 -   * This little class keeps track of writes in progress and allows readers to wait for
writes that started before the read. It assumes that the operation ids
 -   * are monotonically increasing.
 -   *
 -   */
 -  static class WriteTracker {
 -    private static AtomicLong operationCounter = new AtomicLong(1);
 -    private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
 -
 -    WriteTracker() {
 -      for (TabletType ttype : TabletType.values()) {
 -        inProgressWrites.put(ttype, new TreeSet<Long>());
 -      }
 -    }
 -
 -    synchronized long startWrite(TabletType ttype) {
 -      long operationId = operationCounter.getAndIncrement();
 -      inProgressWrites.get(ttype).add(operationId);
 -      return operationId;
 -    }
 -
 -    synchronized void finishWrite(long operationId) {
 -      if (operationId == -1)
 -        return;
 -
 -      boolean removed = false;
 -
 -      for (TabletType ttype : TabletType.values()) {
 -        removed = inProgressWrites.get(ttype).remove(operationId);
 -        if (removed)
 -          break;
 -      }
 -
 -      if (!removed) {
 -        throw new IllegalArgumentException("Attempted to finish write not in progress, 
operationId " + operationId);
 -      }
 -
 -      this.notifyAll();
 -    }
 -
 -    synchronized void waitForWrites(TabletType ttype) {
 -      long operationId = operationCounter.getAndIncrement();
 -      while (inProgressWrites.get(ttype).floor(operationId) != null) {
 -        try {
 -          this.wait();
 -        } catch (InterruptedException e) {
 -          log.error(e, e);
 -        }
 -      }
 -    }
 +  private final TabletServerLogger logger;
  
 -    public long startWrite(Set<Tablet> keySet) {
 -      if (keySet.size() == 0)
 -        return -1;
 +  private final TabletServerMetricsFactory metricsFactory;
 +  private final Metrics updateMetrics;
 +  private final Metrics scanMetrics;
 +  private final Metrics mincMetrics;
  
 -      ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
 -
 -      for (Tablet tablet : keySet)
 -        extents.add(tablet.getExtent());
 -
 -      return startWrite(TabletType.type(extents));
 -    }
 +  public Metrics getMinCMetrics() {
 +    return mincMetrics;
    }
  
 -  public AccumuloConfiguration getSystemConfiguration() {
 -    return serverConfig.getConfiguration();
 -  }
 -
 -  TransactionWatcher watcher = new TransactionWatcher();
 -
 -  private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface
{
 -
 -    SessionManager sessionManager;
 +  private final LogSorter logSorter;
 +  private ReplicationWorker replWorker = null;
 +  private final TabletStatsKeeper statsKeeper;
 +  private final AtomicInteger logIdGenerator = new AtomicInteger();
  
 -    AccumuloConfiguration acuConf = getSystemConfiguration();
 +  private final AtomicLong flushCounter = new AtomicLong(0);
 +  private final AtomicLong syncCounter = new AtomicLong(0);
  
 -    TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
 +  private final VolumeManager fs;
  
 -    TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
 +  private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new
TreeMap<KeyExtent,Tablet>());
 +  private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new
TreeSet<KeyExtent>());
 +  private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new
TreeSet<KeyExtent>());
 +  @SuppressWarnings("unchecked")
 +  private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new
LRUMap(1000));
  
 -    WriteTracker writeTracker = new WriteTracker();
 +  private final TabletServerResourceManager resourceManager;
 +  private final SecurityOperation security;
  
 -    private RowLocks rowLocks = new RowLocks();
 +  private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
  
 -    ThriftClientHandler() {
 -      super(instance, watcher, fs);
 -      log.debug(ThriftClientHandler.class.getName() + " created");
 -      sessionManager = new SessionManager(getSystemConfiguration());
 -      // Register the metrics MBean
 -      try {
 -        updateMetrics.register();
 -        scanMetrics.register();
 -      } catch (Exception e) {
 -        log.error("Exception registering MBean with MBean Server", e);
 -      }
 -    }
 -
 -    @Override
 -    public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long
tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
 -        throws ThriftSecurityException {
 -
 -      if (!security.canPerformSystemActions(credentials))
 -        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +  private Thread majorCompactorThread;
  
 -      List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
 +  private HostAndPort replicationAddress;
 +  private HostAndPort clientAddress;
  
 -      for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet())
{
 -        TKeyExtent tke = entry.getKey();
 -        Map<String,MapFileInfo> fileMap = entry.getValue();
 -        Map<FileRef,MapFileInfo> fileRefMap = new HashMap<FileRef,MapFileInfo>();
 -        for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
 -          Path path = new Path(mapping.getKey());
 -          FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
 -          path = ns.makeQualified(path);
 -          fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
 -        }
 +  private volatile boolean serverStopRequested = false;
 +  private volatile boolean majorCompactorDisabled = false;
 +  private volatile boolean shutdownComplete = false;
  
 -        Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
 +  private ZooLock tabletServerLock;
  
 -        if (importTablet == null) {
 -          failures.add(tke);
 -        } else {
 -          try {
 -            importTablet.importMapFiles(tid, fileRefMap, setTime);
 -          } catch (IOException ioe) {
 -            log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke)
+ ": " + ioe.getMessage());
 -            failures.add(tke);
 -          }
 -        }
 -      }
 -      return failures;
 -    }
 +  private TServer server;
 +  private TServer replServer;
  
 -    private class NextBatchTask extends ScanTask<ScanBatch> {
 +  private DistributedWorkQueue bulkFailedCopyQ;
  
 -      private long scanID;
 +  private String lockID;
  
 -      NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
 -        this.scanID = scanID;
 -        this.interruptFlag = interruptFlag;
 +  public static final AtomicLong seekCount = new AtomicLong(0);
  
 -        if (interruptFlag.get())
 -          cancel(true);
 -      }
 +  private final AtomicLong totalMinorCompactions = new AtomicLong(0);
 +  private final ServerConfigurationFactory confFactory;
  
 +  public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) {
 +    super(confFactory);
 +    this.confFactory = confFactory;
 +    this.fs = fs;
 +    AccumuloConfiguration aconf = getConfiguration();
 +    Instance instance = getInstance();
 +    this.sessionManager = new SessionManager(aconf);
 +    this.logSorter = new LogSorter(instance, fs, aconf);
 +    this.replWorker = new ReplicationWorker(this, fs);
 +    this.statsKeeper = new TabletStatsKeeper();
 +    SimpleTimer.getInstance(aconf).schedule(new Runnable() {
        @Override
        public void run() {
 -
 -        final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
 -        String oldThreadName = Thread.currentThread().getName();
 -
 -        try {
 -          if (isCancelled() || scanSession == null)
 -            return;
 -
 -          runState.set(ScanRunState.RUNNING);
 -
 -          Thread.currentThread().setName(
 -              "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client:
" + scanSession.client + " Tablet: " + scanSession.extent);
 -
 -          Tablet tablet = onlineTablets.get(scanSession.extent);
 -
 -          if (tablet == null) {
 -            addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
 -            return;
 -          }
 -
 -          long t1 = System.currentTimeMillis();
 -          ScanBatch batch = scanSession.scanner.read();
 -          long t2 = System.currentTimeMillis();
 -          scanSession.nbTimes.addStat(t2 - t1);
 -
 -          // there should only be one thing on the queue at a time, so
 -          // it should be ok to call add()
 -          // instead of put()... if add() fails because queue is at
 -          // capacity it means there is code
 -          // problem somewhere
 -          addResult(batch);
 -        } catch (TabletClosedException e) {
 -          addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
 -        } catch (IterationInterruptedException iie) {
 -          if (!isCancelled()) {
 -            log.warn("Iteration interrupted, when scan not cancelled", iie);
 -            addResult(iie);
 -          }
 -        } catch (TooManyFilesException tmfe) {
 -          addResult(tmfe);
 -        } catch (Throwable e) {
 -          log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)"
: scanSession.extent), e);
 -          addResult(e);
 -        } finally {
 -          runState.set(ScanRunState.FINISHED);
 -          Thread.currentThread().setName(oldThreadName);
 +        synchronized (onlineTablets) {
 +          long now = System.currentTimeMillis();
 +          for (Tablet tablet : onlineTablets.values())
 +            try {
 +              tablet.updateRates(now);
 +            } catch (Exception ex) {
 +              log.error("Error updating rates for {}", tablet.getExtent(), ex);
 +            }
          }
 -
 -      }
 -    }
 -
 -    private class LookupTask extends ScanTask<MultiScanResult> {
 -
 -      private long scanID;
 -
 -      LookupTask(long scanID) {
 -        this.scanID = scanID;
        }
 +    }, 5000, 5000);
  
 +    long walogMaxSize = getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
 +    long minBlockSize = CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size",
0);
 +    if (minBlockSize != 0 && minBlockSize > walogMaxSize)
 +      throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize
" + walogMaxSize + " but hdfs minimum block size is "
 +          + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + "
or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
 +    logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter);
 +    this.resourceManager = new TabletServerResourceManager(this, fs);
 +    this.security = AuditedSecurityOperation.getInstance(this);
 +
 +    metricsFactory = new TabletServerMetricsFactory(aconf);
 +    updateMetrics = metricsFactory.createUpdateMetrics();
 +    scanMetrics = metricsFactory.createScanMetrics();
 +    mincMetrics = metricsFactory.createMincMetrics();
++    SimpleTimer.getInstance(aconf).schedule(new Runnable() {
+       @Override
+       public void run() {
 -        MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
 -        String oldThreadName = Thread.currentThread().getName();
 -
 -        try {
 -          if (isCancelled() || session == null)
 -            return;
 -
 -          TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance,
session.threadPoolExtent.getTableId().toString());
 -          long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
 -
 -          runState.set(ScanRunState.RUNNING);
 -          Thread.currentThread().setName("Client: " + session.client + " User: " + session.user
+ " Start: " + session.startTime + " Table: ");
 -
 -          long bytesAdded = 0;
 -          long maxScanTime = 4000;
 -
 -          long startTime = System.currentTimeMillis();
++        TabletLocator.clearLocators();
++      }
++    }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
++  }
+ 
 -          ArrayList<KVEntry> results = new ArrayList<KVEntry>();
 -          Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
 -          ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
 -          KeyExtent partScan = null;
 -          Key partNextKey = null;
 -          boolean partNextKeyInclusive = false;
++  private static long jitter(long ms) {
++    Random r = new Random();
++    // add a random 10% wait
++    return (long)((1. + (r.nextDouble() / 10)) * ms);
 +  }
  
 -          Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
 +  private final SessionManager sessionManager;
  
 -          // check the time so that the read ahead thread is not monopolized
 -          while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis()
- startTime) < maxScanTime) {
 -            Entry<KeyExtent,List<Range>> entry = iter.next();
 +  private final WriteTracker writeTracker = new WriteTracker();
  
 -            iter.remove();
 +  private final RowLocks rowLocks = new RowLocks();
  
 -            // check that tablet server is serving requested tablet
 -            Tablet tablet = onlineTablets.get(entry.getKey());
 -            if (tablet == null) {
 -              failures.put(entry.getKey(), entry.getValue());
 -              continue;
 -            }
 -            Thread.currentThread().setName(
 -                "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime
+ " Tablet: " + entry.getKey().toString());
 +  private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
 +  private final ReentrantLock recoveryLock = new ReentrantLock(true);
  
 -            LookupResult lookupResult;
 -            try {
 +  private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface
{
  
 -              // do the following check to avoid a race condition
 -              // between setting false below and the task being
 -              // canceled
 -              if (isCancelled())
 -                interruptFlag.set(true);
 +    ThriftClientHandler() {
 +      super(TabletServer.this, watcher, fs);
 +      log.debug(ThriftClientHandler.class.getName() + " created");
 +    }
  
 -              lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths,
results, maxResultsSize - bytesAdded, session.ssiList,
 -                  session.ssio, interruptFlag);
 +    @Override
 +    public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long
tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
 +        throws ThriftSecurityException {
  
 -              // if the tablet was closed it it possible that the
 -              // interrupt flag was set.... do not want it set for
 -              // the next
 -              // lookup
 -              interruptFlag.set(false);
 +      if (!security.canPerformSystemActions(credentials))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
  
 -            } catch (IOException e) {
 -              log.warn("lookup failed for tablet " + entry.getKey(), e);
 -              throw new RuntimeException(e);
 -            }
 +      List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
  
 -            bytesAdded += lookupResult.bytesAdded;
 +      for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet())
{
 +        TKeyExtent tke = entry.getKey();
 +        Map<String,MapFileInfo> fileMap = entry.getValue();
 +        Map<FileRef,MapFileInfo> fileRefMap = new HashMap<FileRef,MapFileInfo>();
 +        for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
 +          Path path = new Path(mapping.getKey());
 +          FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
 +          path = ns.makeQualified(path);
 +          fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
 +        }
  
 -            if (lookupResult.unfinishedRanges.size() > 0) {
 -              if (lookupResult.closed) {
 -                failures.put(entry.getKey(), lookupResult.unfinishedRanges);
 -              } else {
 -                session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
 -                partScan = entry.getKey();
 -                partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
 -                partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
 -              }
 -            } else {
 -              fullScans.add(entry.getKey());
 -            }
 -          }
 +        Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
  
 -          long finishTime = System.currentTimeMillis();
 -          session.totalLookupTime += (finishTime - startTime);
 -          session.numEntries += results.size();
 -
 -          // convert everything to thrift before adding result
 -          List<TKeyValue> retResults = new ArrayList<TKeyValue>();
 -          for (KVEntry entry : results)
 -            retResults.add(new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get())));
 -          Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures,
Translators.KET,
 -              new Translator.ListTranslator<Range,TRange>(Translators.RT));
 -          List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translators.KET);
 -          TKeyExtent retPartScan = null;
 -          TKey retPartNextKey = null;
 -          if (partScan != null) {
 -            retPartScan = partScan.toThrift();
 -            retPartNextKey = partNextKey.toThrift();
 -          }
 -          // add results to queue
 -          addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan,
retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
 -        } catch (IterationInterruptedException iie) {
 -          if (!isCancelled()) {
 -            log.warn("Iteration interrupted, when scan not cancelled", iie);
 -            addResult(iie);
 +        if (importTablet == null) {
 +          failures.add(tke);
 +        } else {
 +          try {
 +            importTablet.importMapFiles(tid, fileRefMap, setTime);
 +          } catch (IOException ioe) {
 +            log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke)
+ ": " + ioe.getMessage());
 +            failures.add(tke);
            }
 -        } catch (Throwable e) {
 -          log.warn("exception while doing multi-scan ", e);
 -          addResult(e);
 -        } finally {
 -          Thread.currentThread().setName(oldThreadName);
 -          runState.set(ScanRunState.FINISHED);
          }
        }
 +      return failures;
      }
  
      @Override


Mime
View raw message