accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [2/2] git commit: ACCUMUMLO-2255 more little cleanups and class extractions
Date Mon, 09 Jun 2014 20:01:04 GMT
ACCUMUMLO-2255 more little cleanups and class extractions


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8070e1ca
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8070e1ca
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8070e1ca

Branch: refs/heads/master
Commit: 8070e1ca0eb97aa700cb910b05854ce58653a1a3
Parents: bbc3b5a
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Mon Jun 9 16:01:07 2014 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Mon Jun 9 16:01:07 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/SessionManager.java | 298 ++++++++++++++++
 .../apache/accumulo/tserver/TabletServer.java   | 344 +------------------
 .../apache/accumulo/tserver/WriteTracker.java   |  80 +++++
 .../apache/accumulo/tserver/tablet/Tablet.java  |  38 +-
 4 files changed, 393 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8070e1ca/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java
new file mode 100644
index 0000000..e8fc010
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java
@@ -0,0 +1,298 @@
+package org.apache.accumulo.tserver;
+
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ScanState;
+import org.apache.accumulo.core.tabletserver.thrift.ScanType;
+import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.tserver.TabletServer.ScanRunState;
+import org.apache.accumulo.tserver.TabletServer.ScanTask;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.Session;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+
+public class SessionManager {
+
+  private final SecureRandom random = new SecureRandom();
+  private final Map<Long,Session> sessions = new HashMap<Long,Session>();
+  private final long maxIdle;
+  private final AccumuloConfiguration aconf;
+
+  SessionManager(AccumuloConfiguration conf) {
+    aconf = conf;
+    maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        sweep(maxIdle);
+      }
+    };
+
+    SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
+  }
+
+  synchronized long createSession(Session session, boolean reserve) {
+    long sid = random.nextLong();
+
+    while (sessions.containsKey(sid)) {
+      sid = random.nextLong();
+    }
+
+    sessions.put(sid, session);
+
+    session.reserved = reserve;
+
+    session.startTime = session.lastAccessTime = System.currentTimeMillis();
+
+    return sid;
+  }
+
+  long getMaxIdleTime() {
+    return maxIdle;
+  }
+
+  /**
+   * while a session is reserved, it cannot be canceled or removed
+   *
+   * @param sessionId
+   */
+
+  synchronized Session reserveSession(long sessionId) {
+    Session session = sessions.get(sessionId);
+    if (session != null) {
+      if (session.reserved)
+        throw new IllegalStateException();
+      session.reserved = true;
+    }
+
+    return session;
+
+  }
+
+  synchronized Session reserveSession(long sessionId, boolean wait) {
+    Session session = sessions.get(sessionId);
+    if (session != null) {
+      while (wait && session.reserved) {
+        try {
+          wait(1000);
+        } catch (InterruptedException e) {
+          throw new RuntimeException();
+        }
+      }
+
+      if (session.reserved)
+        throw new IllegalStateException();
+      session.reserved = true;
+    }
+
+    return session;
+
+  }
+
+  synchronized void unreserveSession(Session session) {
+    if (!session.reserved)
+      throw new IllegalStateException();
+    notifyAll();
+    session.reserved = false;
+    session.lastAccessTime = System.currentTimeMillis();
+  }
+
+  synchronized void unreserveSession(long sessionId) {
+    Session session = getSession(sessionId);
+    if (session != null)
+      unreserveSession(session);
+  }
+
+  synchronized Session getSession(long sessionId) {
+    Session session = sessions.get(sessionId);
+    if (session != null)
+      session.lastAccessTime = System.currentTimeMillis();
+    return session;
+  }
+
+  Session removeSession(long sessionId) {
+    return removeSession(sessionId, false);
+  }
+
+  Session removeSession(long sessionId, boolean unreserve) {
+    Session session = null;
+    synchronized (this) {
+      session = sessions.remove(sessionId);
+      if (unreserve && session != null)
+        unreserveSession(session);
+    }
+
+    // do clean up out side of lock..
+    if (session != null)
+      session.cleanup();
+
+    return session;
+  }
+
+  private void sweep(long maxIdle) {
+    List<Session> sessionsToCleanup = new ArrayList<Session>();
+    synchronized (this) {
+      Iterator<Session> iter = sessions.values().iterator();
+      while (iter.hasNext()) {
+        Session session = iter.next();
+        long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+        if (idleTime > maxIdle && !session.reserved) {
+          iter.remove();
+          sessionsToCleanup.add(session);
+        }
+      }
+    }
+
+    // do clean up outside of lock
+    for (Session session : sessionsToCleanup) {
+      session.cleanup();
+    }
+  }
+
+  synchronized void removeIfNotAccessed(final long sessionId, long delay) {
+    Session session = sessions.get(sessionId);
+    if (session != null) {
+      final long removeTime = session.lastAccessTime;
+      TimerTask r = new TimerTask() {
+        @Override
+        public void run() {
+          Session sessionToCleanup = null;
+          synchronized (SessionManager.this) {
+            Session session2 = sessions.get(sessionId);
+            if (session2 != null && session2.lastAccessTime == removeTime &&
!session2.reserved) {
+              sessions.remove(sessionId);
+              sessionToCleanup = session2;
+            }
+          }
+
+          // call clean up outside of lock
+          if (sessionToCleanup != null)
+            sessionToCleanup.cleanup();
+        }
+      };
+
+      SimpleTimer.getInstance(aconf).schedule(r, delay);
+    }
+  }
+
+  public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable()
{
+    Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
+    for (Entry<Long,Session> entry : sessions.entrySet()) {
+
+      Session session = entry.getValue();
+      @SuppressWarnings("rawtypes")
+      ScanTask nbt = null;
+      String tableID = null;
+
+      if (session instanceof ScanSession) {
+        ScanSession ss = (ScanSession) session;
+        nbt = ss.nextBatchTask;
+        tableID = ss.extent.getTableId().toString();
+      } else if (session instanceof MultiScanSession) {
+        MultiScanSession mss = (MultiScanSession) session;
+        nbt = mss.lookupTask;
+        tableID = mss.threadPoolExtent.getTableId().toString();
+      }
+
+      if (nbt == null)
+        continue;
+
+      ScanRunState srs = nbt.getScanRunState();
+
+      if (srs == ScanRunState.FINISHED)
+        continue;
+
+      MapCounter<ScanRunState> stateCounts = counts.get(tableID);
+      if (stateCounts == null) {
+        stateCounts = new MapCounter<ScanRunState>();
+        counts.put(tableID, stateCounts);
+      }
+
+      stateCounts.increment(srs, 1);
+    }
+
+    return counts;
+  }
+
+  public synchronized List<ActiveScan> getActiveScans() {
+
+    List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+
+    long ct = System.currentTimeMillis();
+
+    for (Entry<Long,Session> entry : sessions.entrySet()) {
+      Session session = entry.getValue();
+      if (session instanceof ScanSession) {
+        ScanSession ss = (ScanSession) session;
+
+        ScanState state = ScanState.RUNNING;
+
+        ScanTask<ScanBatch> nbt = ss.nextBatchTask;
+        if (nbt == null) {
+          state = ScanState.IDLE;
+        } else {
+          switch (nbt.getScanRunState()) {
+            case QUEUED:
+              state = ScanState.QUEUED;
+              break;
+            case FINISHED:
+              state = ScanState.IDLE;
+              break;
+            case RUNNING:
+            default:
+              /* do nothing */
+              break;
+          }
+        }
+
+        activeScans.add(new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(),
ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
+            state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT),
ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
+
+      } else if (session instanceof MultiScanSession) {
+        MultiScanSession mss = (MultiScanSession) session;
+
+        ScanState state = ScanState.RUNNING;
+
+        ScanTask<MultiScanResult> nbt = mss.lookupTask;
+        if (nbt == null) {
+          state = ScanState.IDLE;
+        } else {
+          switch (nbt.getScanRunState()) {
+            case QUEUED:
+              state = ScanState.QUEUED;
+              break;
+            case FINISHED:
+              state = ScanState.IDLE;
+              break;
+            case RUNNING:
+            default:
+              /* do nothing */
+              break;
+          }
+        }
+
+        activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(),
ct - mss.startTime, ct - mss.lastAccessTime,
+            ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet,
Translators.CT), mss.ssiList, mss.ssio, mss.auths
+                .getAuthorizationsBB()));
+      }
+    }
+
+    return activeScans;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8070e1ca/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index ce84bd3..17ee446 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -24,12 +24,10 @@ import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -119,8 +117,6 @@ import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
-import org.apache.accumulo.core.tabletserver.thrift.ScanState;
-import org.apache.accumulo.core.tabletserver.thrift.ScanType;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -208,7 +204,6 @@ import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
 import org.apache.accumulo.tserver.session.ConditionalSession;
 import org.apache.accumulo.tserver.session.MultiScanSession;
 import org.apache.accumulo.tserver.session.ScanSession;
-import org.apache.accumulo.tserver.session.Session;
 import org.apache.accumulo.tserver.session.UpdateSession;
 import org.apache.accumulo.tserver.tablet.CommitSession;
 import org.apache.accumulo.tserver.tablet.CompactionInfo;
@@ -334,278 +329,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     this.resourceManager = new TabletServerResourceManager(getInstance(), fs);
   }
 
-  public static class SessionManager {
-
-    private final SecureRandom random = new SecureRandom();
-    private final Map<Long,Session> sessions = new HashMap<Long,Session>();
-    private final long maxIdle;
-    private final AccumuloConfiguration aconf;
-
-    SessionManager(AccumuloConfiguration conf) {
-      aconf = conf;
-      maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
-
-      Runnable r = new Runnable() {
-        @Override
-        public void run() {
-          sweep(maxIdle);
-        }
-      };
-
-      SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
-    }
-
-    synchronized long createSession(Session session, boolean reserve) {
-      long sid = random.nextLong();
-
-      while (sessions.containsKey(sid)) {
-        sid = random.nextLong();
-      }
-
-      sessions.put(sid, session);
-
-      session.reserved = reserve;
-
-      session.startTime = session.lastAccessTime = System.currentTimeMillis();
-
-      return sid;
-    }
-
-    long getMaxIdleTime() {
-      return maxIdle;
-    }
-
-    /**
-     * while a session is reserved, it cannot be canceled or removed
-     *
-     * @param sessionId
-     */
-
-    synchronized Session reserveSession(long sessionId) {
-      Session session = sessions.get(sessionId);
-      if (session != null) {
-        if (session.reserved)
-          throw new IllegalStateException();
-        session.reserved = true;
-      }
-
-      return session;
-
-    }
-
-    synchronized Session reserveSession(long sessionId, boolean wait) {
-      Session session = sessions.get(sessionId);
-      if (session != null) {
-        while (wait && session.reserved) {
-          try {
-            wait(1000);
-          } catch (InterruptedException e) {
-            throw new RuntimeException();
-          }
-        }
-
-        if (session.reserved)
-          throw new IllegalStateException();
-        session.reserved = true;
-      }
-
-      return session;
-
-    }
-
-    synchronized void unreserveSession(Session session) {
-      if (!session.reserved)
-        throw new IllegalStateException();
-      notifyAll();
-      session.reserved = false;
-      session.lastAccessTime = System.currentTimeMillis();
-    }
-
-    synchronized void unreserveSession(long sessionId) {
-      Session session = getSession(sessionId);
-      if (session != null)
-        unreserveSession(session);
-    }
-
-    synchronized Session getSession(long sessionId) {
-      Session session = sessions.get(sessionId);
-      if (session != null)
-        session.lastAccessTime = System.currentTimeMillis();
-      return session;
-    }
-
-    Session removeSession(long sessionId) {
-      return removeSession(sessionId, false);
-    }
-
-    Session removeSession(long sessionId, boolean unreserve) {
-      Session session = null;
-      synchronized (this) {
-        session = sessions.remove(sessionId);
-        if (unreserve && session != null)
-          unreserveSession(session);
-      }
-
-      // do clean up out side of lock..
-      if (session != null)
-        session.cleanup();
-
-      return session;
-    }
-
-    private void sweep(long maxIdle) {
-      List<Session> sessionsToCleanup = new ArrayList<Session>();
-      synchronized (this) {
-        Iterator<Session> iter = sessions.values().iterator();
-        while (iter.hasNext()) {
-          Session session = iter.next();
-          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
-          if (idleTime > maxIdle && !session.reserved) {
-            iter.remove();
-            sessionsToCleanup.add(session);
-          }
-        }
-      }
-
-      // do clean up outside of lock
-      for (Session session : sessionsToCleanup) {
-        session.cleanup();
-      }
-    }
-
-    synchronized void removeIfNotAccessed(final long sessionId, long delay) {
-      Session session = sessions.get(sessionId);
-      if (session != null) {
-        final long removeTime = session.lastAccessTime;
-        TimerTask r = new TimerTask() {
-          @Override
-          public void run() {
-            Session sessionToCleanup = null;
-            synchronized (SessionManager.this) {
-              Session session2 = sessions.get(sessionId);
-              if (session2 != null && session2.lastAccessTime == removeTime &&
!session2.reserved) {
-                sessions.remove(sessionId);
-                sessionToCleanup = session2;
-              }
-            }
-
-            // call clean up outside of lock
-            if (sessionToCleanup != null)
-              sessionToCleanup.cleanup();
-          }
-        };
-
-        SimpleTimer.getInstance(aconf).schedule(r, delay);
-      }
-    }
-
-    public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable()
{
-      Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
-      for (Entry<Long,Session> entry : sessions.entrySet()) {
-
-        Session session = entry.getValue();
-        @SuppressWarnings("rawtypes")
-        ScanTask nbt = null;
-        String tableID = null;
-
-        if (session instanceof ScanSession) {
-          ScanSession ss = (ScanSession) session;
-          nbt = ss.nextBatchTask;
-          tableID = ss.extent.getTableId().toString();
-        } else if (session instanceof MultiScanSession) {
-          MultiScanSession mss = (MultiScanSession) session;
-          nbt = mss.lookupTask;
-          tableID = mss.threadPoolExtent.getTableId().toString();
-        }
-
-        if (nbt == null)
-          continue;
-
-        ScanRunState srs = nbt.getScanRunState();
-
-        if (srs == ScanRunState.FINISHED)
-          continue;
-
-        MapCounter<ScanRunState> stateCounts = counts.get(tableID);
-        if (stateCounts == null) {
-          stateCounts = new MapCounter<ScanRunState>();
-          counts.put(tableID, stateCounts);
-        }
-
-        stateCounts.increment(srs, 1);
-      }
-
-      return counts;
-    }
-
-    public synchronized List<ActiveScan> getActiveScans() {
-
-      List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
-
-      long ct = System.currentTimeMillis();
-
-      for (Entry<Long,Session> entry : sessions.entrySet()) {
-        Session session = entry.getValue();
-        if (session instanceof ScanSession) {
-          ScanSession ss = (ScanSession) session;
-
-          ScanState state = ScanState.RUNNING;
-
-          ScanTask<ScanBatch> nbt = ss.nextBatchTask;
-          if (nbt == null) {
-            state = ScanState.IDLE;
-          } else {
-            switch (nbt.getScanRunState()) {
-              case QUEUED:
-                state = ScanState.QUEUED;
-                break;
-              case FINISHED:
-                state = ScanState.IDLE;
-                break;
-              case RUNNING:
-              default:
-                /* do nothing */
-                break;
-            }
-          }
-
-          activeScans.add(new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(),
ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
-              state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT),
ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
-
-        } else if (session instanceof MultiScanSession) {
-          MultiScanSession mss = (MultiScanSession) session;
-
-          ScanState state = ScanState.RUNNING;
-
-          ScanTask<MultiScanResult> nbt = mss.lookupTask;
-          if (nbt == null) {
-            state = ScanState.IDLE;
-          } else {
-            switch (nbt.getScanRunState()) {
-              case QUEUED:
-                state = ScanState.QUEUED;
-                break;
-              case FINISHED:
-                state = ScanState.IDLE;
-                break;
-              case RUNNING:
-              default:
-                /* do nothing */
-                break;
-            }
-          }
-
-          activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(),
ct - mss.startTime, ct - mss.lastAccessTime,
-              ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet,
Translators.CT), mss.ssiList, mss.ssio, mss.auths
-                  .getAuthorizationsBB()));
-        }
-      }
-
-      return activeScans;
-    }
-  }
-
-  public abstract class ScanTask<T> implements RunnableFuture<T> {
+  public static abstract class ScanTask<T> implements RunnableFuture<T> {
 
     protected AtomicBoolean interruptFlag;
     protected ArrayBlockingQueue<Object> resultQueue;
@@ -703,70 +427,6 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
   }
 
-  /**
-   * This little class keeps track of writes in progress and allows readers to wait for writes
that started before the read. It assumes that the operation ids
-   * are monotonically increasing.
-   *
-   */
-  static class WriteTracker {
-    private static final AtomicLong operationCounter = new AtomicLong(1);
-    private final Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
-
-    WriteTracker() {
-      for (TabletType ttype : TabletType.values()) {
-        inProgressWrites.put(ttype, new TreeSet<Long>());
-      }
-    }
-
-    synchronized long startWrite(TabletType ttype) {
-      long operationId = operationCounter.getAndIncrement();
-      inProgressWrites.get(ttype).add(operationId);
-      return operationId;
-    }
-
-    synchronized void finishWrite(long operationId) {
-      if (operationId == -1)
-        return;
-
-      boolean removed = false;
-
-      for (TabletType ttype : TabletType.values()) {
-        removed = inProgressWrites.get(ttype).remove(operationId);
-        if (removed)
-          break;
-      }
-
-      if (!removed) {
-        throw new IllegalArgumentException("Attempted to finish write not in progress,  operationId
" + operationId);
-      }
-
-      this.notifyAll();
-    }
-
-    synchronized void waitForWrites(TabletType ttype) {
-      long operationId = operationCounter.getAndIncrement();
-      while (inProgressWrites.get(ttype).floor(operationId) != null) {
-        try {
-          this.wait();
-        } catch (InterruptedException e) {
-          log.error(e, e);
-        }
-      }
-    }
-
-    public long startWrite(Set<Tablet> keySet) {
-      if (keySet.size() == 0)
-        return -1;
-
-      List<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
-
-      for (Tablet tablet : keySet)
-        extents.add(tablet.getExtent());
-
-      return startWrite(TabletType.type(extents));
-    }
-  }
-
   public AccumuloConfiguration getSystemConfiguration() {
     return serverConfig.getConfiguration();
   }
@@ -3496,7 +3156,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     if (this.isEnabled()) {
       int result = 0;
       for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        if (tablet.majorCompactionRunning())
+        if (tablet.isMajorCompactionRunning())
           result++;
       }
       return result;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8070e1ca/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java
new file mode 100644
index 0000000..2eed484
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java
@@ -0,0 +1,80 @@
+package org.apache.accumulo.tserver;
+
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.impl.TabletType;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.log4j.Logger;
+
+/**
+ * This little class keeps track of writes in progress and allows readers to wait for writes
that started before the read. It assumes that the operation ids
+ * are monotonically increasing.
+ *
+ */
+class WriteTracker {
+  private static Logger log = Logger.getLogger(WriteTracker.class);
+  
+  private static final AtomicLong operationCounter = new AtomicLong(1);
+  private final Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
+
+  WriteTracker() {
+    for (TabletType ttype : TabletType.values()) {
+      inProgressWrites.put(ttype, new TreeSet<Long>());
+    }
+  }
+
+  synchronized long startWrite(TabletType ttype) {
+    long operationId = operationCounter.getAndIncrement();
+    inProgressWrites.get(ttype).add(operationId);
+    return operationId;
+  }
+
+  synchronized void finishWrite(long operationId) {
+    if (operationId == -1)
+      return;
+
+    boolean removed = false;
+
+    for (TabletType ttype : TabletType.values()) {
+      removed = inProgressWrites.get(ttype).remove(operationId);
+      if (removed)
+        break;
+    }
+
+    if (!removed) {
+      throw new IllegalArgumentException("Attempted to finish write not in progress,  operationId
" + operationId);
+    }
+
+    this.notifyAll();
+  }
+
+  synchronized void waitForWrites(TabletType ttype) {
+    long operationId = operationCounter.getAndIncrement();
+    while (inProgressWrites.get(ttype).floor(operationId) != null) {
+      try {
+        this.wait();
+      } catch (InterruptedException e) {
+        log.error(e, e);
+      }
+    }
+  }
+
+  public long startWrite(Set<Tablet> keySet) {
+    if (keySet.size() == 0)
+      return -1;
+
+    List<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
+
+    for (Tablet tablet : keySet)
+      extents.add(tablet.getExtent());
+
+    return startWrite(TabletType.type(extents));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8070e1ca/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 8d800d8..b615925 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -261,7 +261,7 @@ public class Tablet implements TabletCommitter {
   }
 
   /**
-   * Only visibile for testing
+   * Only visible for testing
    */
   @VisibleForTesting
   protected Tablet(TabletTime tabletTime, String tabletDirectory, int logId, Path location,
DatafileManager datafileManager, TabletServer tabletServer,
@@ -346,7 +346,6 @@ public class Tablet implements TabletCommitter {
       mdScanner.setRange(new Range(rowName));
 
       for (Entry<Key,Value> entry : mdScanner) {
-
         if (entry.getKey().compareRow(rowName) != 0) {
           break;
         }
@@ -1324,7 +1323,7 @@ public class Tablet implements TabletCommitter {
 
       // wait for major compactions to finish, setting closing to
       // true should cause any running major compactions to abort
-      while (majorCompactionRunning()) {
+      while (isMajorCompactionRunning()) {
         try {
           this.wait(50);
         } catch (InterruptedException e) {
@@ -1510,7 +1509,7 @@ public class Tablet implements TabletCommitter {
 
   public synchronized boolean initiateMajorCompaction(MajorCompactionReason reason) {
 
-    if (isClosing() || isClosed() || !needsMajorCompaction(reason) || majorCompactionRunning()
|| majorCompactionQueued.contains(reason)) {
+    if (isClosing() || isClosed() || !needsMajorCompaction(reason) || isMajorCompactionRunning()
|| majorCompactionQueued.contains(reason)) {
       return false;
     }
 
@@ -1526,7 +1525,7 @@ public class Tablet implements TabletCommitter {
    * 
    */
   public boolean needsMajorCompaction(MajorCompactionReason reason) {
-    if (majorCompactionRunning())
+    if (isMajorCompactionRunning())
       return false;
     if (reason == MajorCompactionReason.CHOP || reason == MajorCompactionReason.USER)
       return true;
@@ -1534,7 +1533,7 @@ public class Tablet implements TabletCommitter {
   }
 
   /**
-   * Returns an int representing the total block size of the f served by this tablet.
+   * Returns an int representing the total block size of the files served by this tablet.
    * 
    * @return size
    */
@@ -1695,14 +1694,9 @@ public class Tablet implements TabletCommitter {
    * 
    */
   public synchronized boolean needsSplit() {
-    boolean ret;
-
     if (isClosing() || isClosed())
-      ret = false;
-    else
-      ret = findSplitRow(getDatafileManager().getFiles()) != null;
-
-    return ret;
+      return false;
+    return findSplitRow(getDatafileManager().getFiles()) != null;
   }
 
   // BEGIN PRIVATE METHODS RELATED TO MAJOR COMPACTION
@@ -1972,7 +1966,7 @@ public class Tablet implements TabletCommitter {
         // check that compaction is still needed - defer to splitting
         majorCompactionQueued.remove(reason);
 
-        if (isClosing() || isClosed ()|| !needsMajorCompaction(reason) || majorCompactionRunning()
|| needsSplit()) {
+        if (isClosing() || isClosed ()|| !needsMajorCompaction(reason) || isMajorCompactionRunning()
|| needsSplit()) {
           return null;
         }
 
@@ -2061,7 +2055,7 @@ public class Tablet implements TabletCommitter {
     return closeState == CloseState.COMPLETE;
   }
 
-  public boolean majorCompactionRunning() {
+  public boolean isMajorCompactionRunning() {
     return majorCompactionState == CompactionState.IN_PROGRESS;
   }
 
@@ -2268,12 +2262,10 @@ public class Tablet implements TabletCommitter {
 
   private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>();
 
-  public Set<String> getCurrentLogFiles() {
+  public synchronized Set<String> getCurrentLogFiles() {
     Set<String> result = new HashSet<String>();
-    synchronized (currentLogs) {
-      for (DfsLogger log : currentLogs) {
-        result.add(log.getFileName());
-      }
+    for (DfsLogger log : currentLogs) {
+      result.add(log.getFileName());
     }
     return result;
   }
@@ -2425,7 +2417,7 @@ public class Tablet implements TabletCommitter {
       if (lastCompactID >= compactionId)
         return;
 
-      if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER)
|| majorCompactionRunning())
+      if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER)
|| isMajorCompactionRunning())
         return;
 
       if (getDatafileManager().getDatafileSizes().size() == 0) {
@@ -2562,10 +2554,6 @@ public class Tablet implements TabletCommitter {
     minorCompactionState = null;
   }
 
-  public boolean isMajorCompactionRunning() {
-    return majorCompactionState == CompactionState.IN_PROGRESS;
-  }
-
   public TabletStats getTabletStats() {
     return timer.getTabletStats();
   }


Mime
View raw message