accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject git commit: ACCUMULO-1605
Date Mon, 05 Aug 2013 12:20:58 GMT
Updated Branches:
  refs/heads/master 040f89121 -> ff226a789


ACCUMULO-1605

Added many finally clauses to close spans in the presence of exceptions.

I suspect that the extreme depth of the traces is caused by traces that
are started and never stopped.  This most likely happens in the presence
of errors, where exceptions jump around the the span.stop calls.
In particular, when a file is missing for a compaction.  This is known
to occur in 1.4 when bulk imports fail.  See ACCUMULO-1044.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ff226a78
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ff226a78
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ff226a78

Branch: refs/heads/master
Commit: ff226a78995057518ac354e671d90f1bb4c30884
Parents: 040f891
Author: Eric Newton <ecn@apache.org>
Authored: Thu Aug 1 10:07:06 2013 -0400
Committer: Eric Newton <ecn@apache.org>
Committed: Mon Aug 5 08:19:58 2013 -0400

----------------------------------------------------------------------
 .../accumulo/server/tabletserver/Tablet.java    | 105 +++++----
 .../server/tabletserver/TabletServer.java       | 230 ++++++++++---------
 2 files changed, 178 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff226a78/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index 0272a2f..427fd33 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -621,36 +621,39 @@ public class Tablet {
       TreeSet<FileRef> inUse = new TreeSet<FileRef>();
       
       Span waitForScans = Trace.start("waitForScans");
-      synchronized (Tablet.this) {
-        if (blockNewScans) {
-          if (reservationsBlocked)
-            throw new IllegalStateException();
+      try {
+        synchronized (Tablet.this) {
+          if (blockNewScans) {
+            if (reservationsBlocked)
+              throw new IllegalStateException();
+            
+            reservationsBlocked = true;
+          }
           
-          reservationsBlocked = true;
-        }
-        
-        for (FileRef path : pathsToWaitFor) {
-          while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis()
- startTime < maxWaitTime) {
-            try {
-              Tablet.this.wait(100);
-            } catch (InterruptedException e) {
-              log.warn(e, e);
+          for (FileRef path : pathsToWaitFor) {
+            while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis()
- startTime < maxWaitTime) {
+              try {
+                Tablet.this.wait(100);
+              } catch (InterruptedException e) {
+                log.warn(e, e);
+              }
             }
           }
+          
+          for (FileRef path : pathsToWaitFor) {
+            if (fileScanReferenceCounts.get(path) > 0)
+              inUse.add(path);
+          }
+          
+          if (blockNewScans) {
+            reservationsBlocked = false;
+            Tablet.this.notifyAll();
+          }
+          
         }
-        
-        for (FileRef path : pathsToWaitFor) {
-          if (fileScanReferenceCounts.get(path) > 0)
-            inUse.add(path);
-        }
-        
-        if (blockNewScans) {
-          reservationsBlocked = false;
-          Tablet.this.notifyAll();
-        }
-        
+      } finally {
+        waitForScans.stop();
       }
-      waitForScans.stop();
       return inUse;
     }
     
@@ -2070,20 +2073,26 @@ public class Tablet {
     
     try {
       Span span = Trace.start("write");
-      count = memTable.getNumEntries();
-      
-      DataFileValue dfv = null;
-      if (mergeFile != null)
-        dfv = datafileManager.getDatafileSizes().get(mergeFile);
-      
-      MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile,
acuTableConf, extent, mincReason);
-      CompactionStats stats = compactor.call();
-      
-      span.stop();
+      CompactionStats stats;
+      try {
+        count = memTable.getNumEntries();
+        
+        DataFileValue dfv = null;
+        if (mergeFile != null)
+          dfv = datafileManager.getDatafileSizes().get(mergeFile);
+        
+        MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv,
tmpDatafile, acuTableConf, extent, mincReason);
+        stats = compactor.call();
+      } finally {
+        span.stop();
+      }
       span = Trace.start("bringOnline");
-      datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new
DataFileValue(stats.getFileSize(), stats.getEntriesWritten()),
-          commitSession, flushId);
-      span.stop();
+      try {
+        datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new
DataFileValue(stats.getFileSize(), stats.getEntriesWritten()),
+            commitSession, flushId);
+      } finally {
+        span.stop();
+      }
       return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten());
     } catch (Exception E) {
       failed = true;
@@ -3317,18 +3326,18 @@ public class Tablet {
     // Always trace majC
     Span span = Trace.on("majorCompaction");
     
-    synchronized (this) {
-      // check that compaction is still needed - defer to splitting
-      majorCompactionQueued.remove(reason);
-      
-      if (closing || closed || !needsMajorCompaction(reason) || majorCompactionInProgress
|| needsSplit()) {
-        return null;
+    try {
+      synchronized (this) {
+        // check that compaction is still needed - defer to splitting
+        majorCompactionQueued.remove(reason);
+        
+        if (closing || closed || !needsMajorCompaction(reason) || majorCompactionInProgress
|| needsSplit()) {
+          return null;
+        }
+        
+        majorCompactionInProgress = true;
       }
-      
-      majorCompactionInProgress = true;
-    }
     
-    try {
       majCStats = _majorCompact(reason);
       if (reason == MajorCompactionReason.CHOP) {
         MetadataTableUtil.chopped(getExtent(), this.tabletServer.getLock());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff226a78/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 6182b27..3216731 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -1528,56 +1528,58 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
       
       Span prep = Trace.start("prep");
-      for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet())
{
-        
-        Tablet tablet = entry.getKey();
-        List<Mutation> mutations = entry.getValue();
-        if (mutations.size() > 0) {
-          try {
-            if (updateMetrics.isEnabled())
-              updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
-            
-            CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
-            if (commitSession == null) {
-              if (us.currentTablet == tablet) {
-                us.currentTablet = null;
+      try {
+        for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet())
{
+          
+          Tablet tablet = entry.getKey();
+          List<Mutation> mutations = entry.getValue();
+          if (mutations.size() > 0) {
+            try {
+              if (updateMetrics.isEnabled())
+                updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
+              
+              CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
+              if (commitSession == null) {
+                if (us.currentTablet == tablet) {
+                  us.currentTablet = null;
+                }
+                us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
+              } else {
+                sendables.put(commitSession, mutations);
+                mutationCount += mutations.size();
               }
-              us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
-            } else {
-              sendables.put(commitSession, mutations);
+              
+            } catch (TConstraintViolationException e) {
+              us.violations.add(e.getViolations());
+              if (updateMetrics.isEnabled())
+                updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
+              
+              if (e.getNonViolators().size() > 0) {
+                // only log and commit mutations if there were some
+                // that did not
+                // violate constraints... this is what
+                // prepareMutationsForCommit()
+                // expects
+                sendables.put(e.getCommitSession(), e.getNonViolators());
+              }
+              
               mutationCount += mutations.size();
+              
+            } catch (HoldTimeoutException t) {
+              error = t;
+              log.debug("Giving up on mutations due to a long memory hold time");
+              break;
+            } catch (Throwable t) {
+              error = t;
+              log.error("Unexpected error preparing for commit", error);
+              break;
             }
-            
-          } catch (TConstraintViolationException e) {
-            us.violations.add(e.getViolations());
-            if (updateMetrics.isEnabled())
-              updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
-            
-            if (e.getNonViolators().size() > 0) {
-              // only log and commit mutations if there were some
-              // that did not
-              // violate constraints... this is what
-              // prepareMutationsForCommit()
-              // expects
-              sendables.put(e.getCommitSession(), e.getNonViolators());
-            }
-            
-            mutationCount += mutations.size();
-            
-          } catch (HoldTimeoutException t) {
-            error = t;
-            log.debug("Giving up on mutations due to a long memory hold time");
-            break;
-          } catch (Throwable t) {
-            error = t;
-            log.error("Unexpected error preparing for commit", error);
-            break;
           }
         }
+      } finally {
+        prep.stop();
       }
-      prep.stop();
       
-      Span wal = Trace.start("wal");
       long pt2 = System.currentTimeMillis();
       long avgPrepareTime = (long) ((pt2 - pt1) / (double) us.queuedMutations.size());
       us.prepareTimes.addStat(pt2 - pt1);
@@ -1591,60 +1593,66 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         throw new RuntimeException(error);
       }
       try {
-        while (true) {
-          try {
-            long t1 = System.currentTimeMillis();
-            
-            logger.logManyTablets(sendables);
-            
-            long t2 = System.currentTimeMillis();
-            us.walogTimes.addStat(t2 - t1);
-            if (updateMetrics.isEnabled())
-              updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1));
-            
-            break;
-          } catch (IOException ex) {
-            log.warn("logging mutations failed, retrying");
-          } catch (FSError ex) { // happens when DFS is localFS
-            log.warn("logging mutations failed, retrying");
-          } catch (Throwable t) {
-            log.error("Unknown exception logging mutations, counts for mutations in flight
not decremented!", t);
-            throw new RuntimeException(t);
+        Span wal = Trace.start("wal");
+        try {
+          while (true) {
+            try {
+              long t1 = System.currentTimeMillis();
+              
+              logger.logManyTablets(sendables);
+              
+              long t2 = System.currentTimeMillis();
+              us.walogTimes.addStat(t2 - t1);
+              if (updateMetrics.isEnabled())
+                updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1));
+              
+              break;
+            } catch (IOException ex) {
+              log.warn("logging mutations failed, retrying");
+            } catch (FSError ex) { // happens when DFS is localFS
+              log.warn("logging mutations failed, retrying");
+            } catch (Throwable t) {
+              log.error("Unknown exception logging mutations, counts for mutations in flight
not decremented!", t);
+              throw new RuntimeException(t);
+            }
           }
+        } finally {
+          wal.stop();
         }
         
-        wal.stop();
-        
         Span commit = Trace.start("commit");
-        long t1 = System.currentTimeMillis();
-        for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet())
{
-          CommitSession commitSession = entry.getKey();
-          List<Mutation> mutations = entry.getValue();
+        try {
+          long t1 = System.currentTimeMillis();
+          for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet())
{
+            CommitSession commitSession = entry.getKey();
+            List<Mutation> mutations = entry.getValue();
+            
+            commitSession.commit(mutations);
+            
+            Tablet tablet = commitSession.getTablet();
+            
+            if (tablet == us.currentTablet) {
+              // because constraint violations may filter out some
+              // mutations, for proper
+              // accounting with the client code, need to increment
+              // the count based
+              // on the original number of mutations from the client
+              // NOT the filtered number
+              us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
+            }
+          }
+          long t2 = System.currentTimeMillis();
           
-          commitSession.commit(mutations);
+          long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size());
           
-          Tablet tablet = commitSession.getTablet();
+          us.flushTime += (t2 - pt1);
+          us.commitTimes.addStat(t2 - t1);
           
-          if (tablet == us.currentTablet) {
-            // because constraint violations may filter out some
-            // mutations, for proper
-            // accounting with the client code, need to increment
-            // the count based
-            // on the original number of mutations from the client
-            // NOT the filtered number
-            us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
-          }
+          if (updateMetrics.isEnabled())
+            updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime);
+        } finally {
+          commit.stop();
         }
-        long t2 = System.currentTimeMillis();
-        
-        long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size());
-        
-        us.flushTime += (t2 - pt1);
-        us.commitTimes.addStat(t2 - t1);
-        
-        if (updateMetrics.isEnabled())
-          updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime);
-        commit.stop();
       } finally {
         us.queuedMutations.clear();
         if (us.currentTablet != null) {
@@ -1716,8 +1724,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         List<Mutation> mutations = Collections.singletonList(mutation);
         
         Span prep = Trace.start("prep");
-        CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security,
credentials), mutations);
-        prep.stop();
+        CommitSession cs;
+        try {
+          cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials),
mutations);
+        } finally {
+          prep.stop();
+        }
         if (cs == null) {
           throw new NotServingTabletException(tkeyExtent);
         }
@@ -1725,8 +1737,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         while (true) {
           try {
             Span wal = Trace.start("wal");
-            logger.log(cs, cs.getWALogSeq(), mutation);
-            wal.stop();
+            try {
+              logger.log(cs, cs.getWALogSeq(), mutation);
+            } finally {
+              wal.stop();
+            }
             break;
           } catch (IOException ex) {
             log.warn(ex, ex);
@@ -1734,8 +1749,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         }
         
         Span commit = Trace.start("commit");
-        cs.commit(mutations);
-        commit.stop();
+        try {
+          cs.commit(mutations);
+        } finally {
+          commit.stop();
+        }
       } catch (TConstraintViolationException e) {
         throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(),
Translator.CVST));
       } finally {
@@ -2913,11 +2931,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     MetadataTableUtil.addLogEntry(SystemCredentials.get(), entry, getLock());
   }
   
-  private int startServer(AccumuloConfiguration conf, String address, Property portHint,
TProcessor processor, String threadName) throws UnknownHostException {
+  private InetSocketAddress startServer(AccumuloConfiguration conf, String address, Property
portHint, TProcessor processor, String threadName) throws UnknownHostException {
     ServerAddress sp = TServerUtils.startServer(conf, address, portHint, processor, this.getClass().getSimpleName(),
threadName, Property.TSERV_PORTSEARCH,
         Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
     this.server = sp.server;
-    return sp.address.getPort();
+    return sp.address;
   }
   
   private String getMasterAddress() {
@@ -2953,13 +2971,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     ThriftUtil.returnClient(client);
   }
   
-  private int startTabletClientService() throws UnknownHostException {
+  private InetSocketAddress startTabletClientService() throws UnknownHostException {
     // start listening for client connection last
     Iface tch = TraceWrap.service(new ThriftClientHandler());
     Processor<Iface> processor = new Processor<Iface>(tch);
-    int port = startServer(getSystemConfiguration(), clientAddress.getHostName(), Property.TSERV_CLIENTPORT,
processor, "Thrift Client Server");
-    log.info("port = " + port);
-    return port;
+    InetSocketAddress address = startServer(getSystemConfiguration(), clientAddress.getHostName(),
Property.TSERV_CLIENTPORT, processor, "Thrift Client Server");
+    log.info("address = " + address);
+    return address;
   }
   
   ZooLock getLock() {
@@ -3026,17 +3044,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   public void run() {
     SecurityUtil.serverLogin();
     
-    int clientPort = 0;
     try {
-      clientPort = startTabletClientService();
+      clientAddress = startTabletClientService();
     } catch (UnknownHostException e1) {
-      log.error("Unable to start tablet client service", e1);
-      UtilWaitThread.sleep(1000);
-    }
-    if (clientPort == 0) {
-      throw new RuntimeException("Failed to start the tablet client service");
+      throw new RuntimeException("Failed to start the tablet client service", e1);
     }
-    clientAddress = new InetSocketAddress(clientAddress.getHostName(), clientPort);
     announceExistence();
     
     ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS),
"distributed work queue");


Mime
View raw message